Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: changes stream #297

Merged
merged 1 commit into from
Aug 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion app/core/entity/Task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ export class Task<T extends TaskBaseData = TaskBaseData> extends Entity {
return task;
}

public static createChangesStream(targetName: string, since = ''): ChangesStreamTask {
public static createChangesStream(targetName: string, registryId = '', since = ''): ChangesStreamTask {
const data = {
type: TaskType.ChangesStream,
state: TaskState.Waiting,
Expand All @@ -146,6 +146,7 @@ export class Task<T extends TaskBaseData = TaskBaseData> extends Entity {
data: {
// task execute worker
taskWorker: '',
registryId,
since,
},
};
Expand Down
13 changes: 12 additions & 1 deletion app/core/service/ChangesStreamService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,10 @@ export class ChangesStreamService extends AbstractService {
for await (const change of stream) {
const { fullname, seq } = change as ChangesStreamChange;
lastPackage = fullname;
lastSince = seq;
const valid = await this.needSync(registry, fullname);
if (valid) {
taskCount++;
lastSince = seq;
await this.packageSyncerService.createTask(fullname, {
authorIp: HOST_NAME,
authorId: 'ChangesStreamService',
Expand All @@ -185,6 +185,17 @@ export class ChangesStreamService extends AbstractService {
}
}

// 如果没有需要同步的任务,也更新一下 taskData 里的信息
if (taskCount === 0) {
// 实时更新 task 信息
task.updateSyncData({
lastSince,
lastPackage,
taskCount,
});
await this.taskRepository.saveTask(task);
}

return { lastSince, taskCount };
}
}
2 changes: 1 addition & 1 deletion app/core/service/RegistryManagerService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ export class RegistryManagerService extends AbstractService {

// 启动 changeStream
const targetName = `${registry.name.toUpperCase()}_WORKER`;
await this.taskService.createTask(Task.createChangesStream(targetName, since), false);
await this.taskService.createTask(Task.createChangesStream(targetName, registryId, since), false);
}

async createRegistry(createCmd: CreateRegistryCmd): Promise<Registry> {
Expand Down
4 changes: 2 additions & 2 deletions app/repository/model/HistoryTask.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Attribute, Model } from '@eggjs/tegg-orm-decorator';
import { DataTypes, Bone } from 'leoric';
import { DataTypes, Bone, LENGTH_VARIANTS } from 'leoric';
import { TaskState, TaskType } from '../../common/enum/Task';

@Model()
Expand Down Expand Up @@ -48,6 +48,6 @@ export class HistoryTask extends Bone {
@Attribute(DataTypes.INTEGER)
attempts: number;

@Attribute(DataTypes.TEXT('long'))
@Attribute(DataTypes.TEXT(LENGTH_VARIANTS.long))
error: string;
}
4 changes: 2 additions & 2 deletions app/repository/model/PackageVersionBlock.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Attribute, Model } from '@eggjs/tegg-orm-decorator';
import { DataTypes, Bone } from 'leoric';
import { DataTypes, Bone, LENGTH_VARIANTS } from 'leoric';

@Model()
export class PackageVersionBlock extends Bone {
Expand All @@ -26,6 +26,6 @@ export class PackageVersionBlock extends Bone {
@Attribute(DataTypes.STRING(256))
version: string;

@Attribute(DataTypes.TEXT('long'))
@Attribute(DataTypes.TEXT(LENGTH_VARIANTS.long))
reason: string;
}
4 changes: 2 additions & 2 deletions app/repository/model/Task.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Attribute, Model } from '@eggjs/tegg-orm-decorator';
import { DataTypes, Bone } from 'leoric';
import { DataTypes, Bone, LENGTH_VARIANTS } from 'leoric';
import { TaskState, TaskType } from '../../common/enum/Task';

@Model()
Expand Down Expand Up @@ -48,7 +48,7 @@ export class Task extends Bone {
@Attribute(DataTypes.INTEGER)
attempts: number;

@Attribute(DataTypes.TEXT('long'))
@Attribute(DataTypes.TEXT(LENGTH_VARIANTS.long))
error: string;

@Attribute(DataTypes.STRING(48), {
Expand Down
20 changes: 19 additions & 1 deletion test/core/service/ChangesStreamService.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ describe('test/core/service/ChangesStreamService.test.ts', () => {
registryManagerService = await ctx.getEggObject(RegistryManagerService);
scopeManagerService = await ctx.getEggObject(ScopeManagerService);
assert(changesStreamService);
task = Task.createChangesStream('GLOBAL_WORKER', '9527');
task = Task.createChangesStream('GLOBAL_WORKER', '', '9527');
taskService.createTask(task, false);

// create default registry
Expand Down Expand Up @@ -163,6 +163,24 @@ describe('test/core/service/ChangesStreamService.test.ts', () => {
assert(changes.taskCount === 2);
assert(changes.lastSince === '3');
});

it('should update since even not taskCount', async () => {
mock(ChangesStreamService.prototype, 'needSync', async () => {
return false;
});
mock(ctx.httpclient, 'request', async () => {
return {
res: Readable.from(`
{"seq":2,"id":"backbone.websql.deferred","changes":[{"rev":"4-f5150b238ab62cd890211fb57fc9eca5"}],"deleted":true},
{"seq":3,"id":"backbone2.websql.deferred","changes":[{"rev":"4-f6150b238ab62cd890211fb57fc9eca5"}],"deleted":true},
`),
};
});
const changes = await changesStreamService.executeSync('1', task);
assert(changes.taskCount === 0);
assert(changes.lastSince === '3');
assert(task.data.since === '3');
});
});

});
1 change: 1 addition & 0 deletions test/core/service/RegistryManagerService/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ describe('test/core/service/RegistryManagerService/index.test.ts', () => {
const targetName = 'CUSTOM_WORKER';
const task = await taskRepository.findTaskByTargetName(targetName, TaskType.ChangesStream);
assert(task);
assert((task.data as ChangesStreamTaskData).registryId === registry.registryId);
});

it('should preCheck registry', async () => {
Expand Down