diff --git a/app/common/constants.ts b/app/common/constants.ts index ef63ba49..f77c1515 100644 --- a/app/common/constants.ts +++ b/app/common/constants.ts @@ -1,2 +1,3 @@ export const BUG_VERSIONS = 'bug-versions'; export const LATEST_TAG = 'latest'; +export const GLOBAL_WORKER = 'GLOBAL_WORKER'; diff --git a/app/core/service/CacheService.ts b/app/core/service/CacheService.ts index 053b8575..76e50c38 100644 --- a/app/core/service/CacheService.ts +++ b/app/core/service/CacheService.ts @@ -5,28 +5,38 @@ import { } from '@eggjs/tegg'; import { CacheAdapter } from '../../common/adapter/CacheAdapter'; import { AbstractService } from '../../common/AbstractService'; +import { ChangesStreamTaskData } from '../entity/Task'; type PackageCacheAttribe = 'etag' | 'manifests'; -type TotalData = { +export type UpstreamRegistryInfo = { + registry_name: string; + source_registry: string; + changes_stream_url: string; +} & ChangesStreamTaskData; + +export type DownloadInfo = { + today: number; + yesterday: number; + samedayLastweek: number; + thisweek: number; + thismonth: number; + thisyear: number; + lastweek: number; + lastmonth: number; + lastyear: number; +}; + +export type TotalData = { packageCount: number; packageVersionCount: number; lastPackage: string; lastPackageVersion: string; - download: { - today: number; - yesterday: number; - samedayLastweek: number; - thisweek: number; - thismonth: number; - thisyear: number; - lastweek: number; - lastmonth: number; - lastyear: number; - }; - changesStream: object, + download: DownloadInfo; + changesStream: ChangesStreamTaskData; lastChangeId: number | bigint; cacheTime: string; + upstreamRegistries: UpstreamRegistryInfo[]; }; const TOTAL_DATA_KEY = '__TOTAL_DATA__'; @@ -72,6 +82,7 @@ export class CacheService extends AbstractService { lastyear: 0, }, changesStream: {}, + upstreamRegistries: [], lastChangeId: 0, cacheTime: '', }; diff --git a/app/core/service/ChangesStreamService.ts b/app/core/service/ChangesStreamService.ts index 68046ce7..94703293 100644 --- a/app/core/service/ChangesStreamService.ts +++ b/app/core/service/ChangesStreamService.ts @@ -18,6 +18,7 @@ import { E500 } from 'egg-errors'; import { Registry } from '../entity/Registry'; import { AbstractChangeStream } from '../../common/adapter/changesStream/AbstractChangesStream'; import { getScopeAndName } from '../../common/PackageUtil'; +import { GLOBAL_WORKER } from '../../common/constants'; import { ScopeManagerService } from './ScopeManagerService'; import { PackageRepository } from '../../repository/PackageRepository'; @@ -44,7 +45,7 @@ export class ChangesStreamService extends AbstractService { // GLOBAL_WORKER: 默认的同步源 // `{registryName}_WORKER`: 自定义 scope 的同步源 public async findExecuteTask(): Promise { - const targetName = 'GLOBAL_WORKER'; + const targetName = GLOBAL_WORKER; const globalRegistryTask = await this.taskRepository.findTaskByTargetName(targetName, TaskType.ChangesStream); // 如果没有配置默认同步源,先进行初始化 if (!globalRegistryTask) { diff --git a/app/port/controller/HomeController.ts b/app/port/controller/HomeController.ts index 2c056204..8950166f 100644 --- a/app/port/controller/HomeController.ts +++ b/app/port/controller/HomeController.ts @@ -8,10 +8,44 @@ import { Inject, } from '@eggjs/tegg'; import { AbstractController } from './AbstractController'; -import { CacheService } from '../../core/service/CacheService'; +import { CacheService, DownloadInfo, UpstreamRegistryInfo } from '../../core/service/CacheService'; const startTime = new Date(); +// registry 站点信息数据 SiteTotalData +// SiteEnvInfo: 环境、运行时相关信息,实时查询 +// UpstreamInfo: 上游信息,实时查询 +// TotalInfo: 总数据信息,定时任务每分钟生成 +// LegacyInfo: 旧版兼容信息 +type SiteTotalData = LegacyInfo & SiteEnvInfo & TotalInfo; + +type LegacyInfo = { + source_registry: string, + changes_stream_registry: string, + sync_changes_steam: any, +}; + +type SiteEnvInfo = { + sync_model: string; + sync_binary: string; + instance_start_time: Date; + node_version: string; + app_version: string; + engine: string; + cache_time: string; +}; + +type TotalInfo = { + last_package: string; + last_package_version: string; + doc_count: number | bigint; + doc_version_count: number | bigint; + update_seq: number | bigint; + download: DownloadInfo; + upstream_registries?: UpstreamRegistryInfo[]; +}; + + @HTTPController() export class HomeController extends AbstractController { @Inject() @@ -23,9 +57,12 @@ export class HomeController extends AbstractController { path: '/', method: HTTPMethodEnum.GET, }) + // 2023-1-20 + // 原有 LegacyInfo 字段继续保留,由于 ChangesStream 信息通过 registry 表配置,可能会过期 + // 新增 upstream_registries 字段,展示上游源站 registry 信息列表 async showTotal() { const totalData = await this.cacheService.getTotalData(); - const data = { + const data: SiteTotalData = { last_package: totalData.lastPackage, last_package_version: totalData.lastPackageVersion, doc_count: totalData.packageCount, @@ -42,6 +79,7 @@ export class HomeController extends AbstractController { source_registry: this.config.cnpmcore.sourceRegistry, changes_stream_registry: this.config.cnpmcore.changesStreamRegistry, cache_time: totalData.cacheTime, + upstream_registries: totalData.upstreamRegistries, }; return data; } diff --git a/app/port/schedule/UpdateTotalData.ts b/app/port/schedule/UpdateTotalData.ts index 273b85a3..895e8cc8 100644 --- a/app/port/schedule/UpdateTotalData.ts +++ b/app/port/schedule/UpdateTotalData.ts @@ -1,15 +1,17 @@ import { EggLogger } from 'egg'; import { IntervalParams, Schedule, ScheduleType } from '@eggjs/tegg/schedule'; import { Inject } from '@eggjs/tegg'; +import { ChangesStreamTaskData } from '../../core/entity/Task'; +import { RegistryManagerService } from '../../core/service/RegistryManagerService'; import { PackageVersionDownloadRepository } from '../../repository/PackageVersionDownloadRepository'; import { PackageRepository } from '../../repository/PackageRepository'; import { TaskRepository } from '../../repository/TaskRepository'; import { ChangeRepository } from '../../repository/ChangeRepository'; -import { CacheService } from '../../core/service/CacheService'; +import { CacheService, DownloadInfo, TotalData } from '../../core/service/CacheService'; import { TaskType } from '../../common/enum/Task'; +import { GLOBAL_WORKER } from '../../common/constants'; import dayjs from '../../common/dayjs'; - @Schedule({ type: ScheduleType.WORKER, scheduleData: { @@ -38,11 +40,12 @@ export class UpdateTotalData { @Inject() private readonly cacheService: CacheService; - async subscribe() { - const changesStreamTask = await this.taskRepository.findTaskByTargetName('GLOBAL_WORKER', TaskType.ChangesStream); - const packageTotal = await this.packageRepository.queryTotal(); + @Inject() + private readonly registryManagerService: RegistryManagerService; - const download = { + // 计算下载量相关信息,不区分不同 changesStream + private async calculateDownloadInfo() { + const download: DownloadInfo = { today: 0, yesterday: 0, samedayLastweek: 0, @@ -92,15 +95,44 @@ export class UpdateTotalData { } } } + return download; + } + + async subscribe() { + const packageTotal = await this.packageRepository.queryTotal(); + const download = await this.calculateDownloadInfo(); const lastChange = await this.changeRepository.getLastChange(); - const totalData = { + const totalData: TotalData = { ...packageTotal, download, - changesStream: changesStreamTask && changesStreamTask.data || {}, lastChangeId: lastChange && lastChange.id || 0, cacheTime: new Date().toISOString(), + changesStream: {} as unknown as ChangesStreamTaskData, + upstreamRegistries: [], }; + + const tasks = await this.taskRepository.findTasksByCondition({ type: TaskType.ChangesStream }); + for (const task of tasks) { + // 全局 changesStream + const data = task.data as ChangesStreamTaskData; + // 补充录入 upstreamRegistries + const registry = await this.registryManagerService.findByRegistryId(data.registryId as string); + if (registry) { + totalData.upstreamRegistries.push({ + ...data, + source_registry: registry?.host, + changes_stream_url: registry?.changeStream, + registry_name: registry?.name, + }); + } + + // 兼容 LegacyInfo 字段 + if (task.targetName === GLOBAL_WORKER) { + totalData.changesStream = data; + } + } + await this.cacheService.saveTotalData(totalData); this.logger.info('[UpdateTotalData.subscribe] total data: %j', totalData); } diff --git a/app/repository/PackageRepository.ts b/app/repository/PackageRepository.ts index de9ff97f..c1cd2741 100644 --- a/app/repository/PackageRepository.ts +++ b/app/repository/PackageRepository.ts @@ -1,9 +1,10 @@ import { AccessLevel, SingletonProto, Inject } from '@eggjs/tegg'; -import type { Package as PackageModel } from './model/Package'; +import { Orm } from '@eggjs/tegg-orm-plugin/lib/SingletonORM'; +import { Package as PackageModel } from './model/Package'; import { Package as PackageEntity } from '../core/entity/Package'; import { ModelConvertor } from './util/ModelConvertor'; import { PackageVersion as PackageVersionEntity } from '../core/entity/PackageVersion'; -import type { PackageVersion as PackageVersionModel } from './model/PackageVersion'; +import { PackageVersion as PackageVersionModel } from './model/PackageVersion'; import { PackageVersionManifest as PackageVersionManifestEntity } from '../core/entity/PackageVersionManifest'; import type { PackageVersionManifest as PackageVersionManifestModel } from './model/PackageVersionManifest'; import type { Dist as DistModel } from './model/Dist'; @@ -14,6 +15,8 @@ import type { Maintainer as MaintainerModel } from './model/Maintainer'; import type { User as UserModel } from './model/User'; import { User as UserEntity } from '../core/entity/User'; import { AbstractRepository } from './AbstractRepository'; +import { EggAppConfig } from 'egg'; +import { Bone } from 'leoric'; @SingletonProto({ accessLevel: AccessLevel.PUBLIC, @@ -40,6 +43,12 @@ export class PackageRepository extends AbstractRepository { @Inject() private readonly User: typeof UserModel; + @Inject() + private readonly config: EggAppConfig; + + @Inject() + private readonly orm: Orm; + async findPackage(scope: string, name: string): Promise { const model = await this.Package.findOne({ scope, name }); if (!model) return null; @@ -241,6 +250,20 @@ export class PackageRepository extends AbstractRepository { return ModelConvertor.convertModelToEntity(model, this.PackageVersionManifest); } + private getCountSql(model: typeof Bone):string { + const { database } = this.config.orm; + const sql = ` + SELECT + TABLE_ROWS + FROM + information_schema.tables + WHERE + table_schema = '${database}' + AND table_name = '${model.table}' + `; + return sql; + } + public async queryTotal() { const lastPkg = await this.Package.findOne().order('id', 'desc'); const lastVersion = await this.PackageVersion.findOne().order('id', 'desc'); @@ -252,7 +275,9 @@ export class PackageRepository extends AbstractRepository { if (lastPkg) { lastPackage = lastPkg.scope ? `${lastPkg.scope}/${lastPkg.name}` : lastPkg.name; // FIXME: id will be out of range number - packageCount = Number(lastPkg.id); + // 可能存在 id 增长不连续的情况,通过 count 查询 + const queryRes = await this.orm.client.query(this.getCountSql(PackageModel)); + packageCount = queryRes.rows?.[0].TABLE_ROWS as number; } if (lastVersion) { @@ -261,7 +286,8 @@ export class PackageRepository extends AbstractRepository { const fullname = pkg.scope ? `${pkg.scope}/${pkg.name}` : pkg.name; lastPackageVersion = `${fullname}@${lastVersion.version}`; } - packageVersionCount = Number(lastVersion.id); + const queryRes = await this.orm.client.query(this.getCountSql(PackageVersionModel)); + packageVersionCount = queryRes.rows?.[0].TABLE_ROWS as number; } return { packageCount, @@ -327,4 +353,5 @@ export class PackageRepository extends AbstractRepository { } return entities; } + } diff --git a/app/repository/TaskRepository.ts b/app/repository/TaskRepository.ts index 17cb3e1b..fe359e07 100644 --- a/app/repository/TaskRepository.ts +++ b/app/repository/TaskRepository.ts @@ -93,6 +93,11 @@ export class TaskRepository extends AbstractRepository { return tasks.map(task => ModelConvertor.convertModelToEntity(task, TaskEntity)); } + async findTasksByCondition(where: { targetName?: string; state?: TaskState; type: TaskType }): Promise> { + const tasks = await this.Task.find(where); + return tasks.map(task => ModelConvertor.convertModelToEntity(task, TaskEntity)); + } + async findTaskByTargetName(targetName: string, type: TaskType, state?: TaskState) { const where: any = { targetName, type }; if (state) { diff --git a/test/port/controller/HomeController/showTotal.test.ts b/test/port/controller/HomeController/showTotal.test.ts index 9a63dd23..d90d6efd 100644 --- a/test/port/controller/HomeController/showTotal.test.ts +++ b/test/port/controller/HomeController/showTotal.test.ts @@ -3,12 +3,23 @@ import { app, mock } from 'egg-mock/bootstrap'; import { TestUtil } from 'test/TestUtil'; import { PackageVersionDownload } from 'app/repository/model/PackageVersionDownload'; import dayjs from 'app/common/dayjs'; +import { RegistryManagerService } from 'app/core/service/RegistryManagerService'; +import { ChangesStreamService } from 'app/core/service/ChangesStreamService'; +import { TaskRepository } from 'app/repository/TaskRepository'; +import { TaskType } from 'app/common/enum/Task'; +import { ChangesStreamTask } from 'app/core/entity/Task'; +import { RegistryType } from 'app/common/enum/Registry'; +import { ScopeManagerService } from 'app/core/service/ScopeManagerService'; const SavePackageVersionDownloadCounterPath = require.resolve('../../../../app/port/schedule/SavePackageVersionDownloadCounter'); const UpdateTotalDataPath = require.resolve('../../../../app/port/schedule/UpdateTotalData'); describe('test/port/controller/HomeController/showTotal.test.ts', () => { describe('[GET /] showTotal()', () => { + let registryManagerService: RegistryManagerService; + let changesStreamService: ChangesStreamService; + let taskRepository: TaskRepository; + let scopeManagerService: ScopeManagerService; it('should total information', async () => { let res = await app.httpRequest() .get('/'); @@ -58,6 +69,7 @@ describe('test/port/controller/HomeController/showTotal.test.ts', () => { await app.runSchedule(SavePackageVersionDownloadCounterPath); await app.runSchedule(UpdateTotalDataPath); + res = await app.httpRequest() .get('/'); assert(res.status === 200); @@ -171,5 +183,87 @@ describe('test/port/controller/HomeController/showTotal.test.ts', () => { const data = res.body; assert(data.sync_binary === true); }); + + describe('upstream_registries', async () => { + beforeEach(async () => { + registryManagerService = await app.getEggObject(RegistryManagerService); + changesStreamService = await app.getEggObject(ChangesStreamService); + taskRepository = await app.getEggObject(TaskRepository); + scopeManagerService = await app.getEggObject(ScopeManagerService); + await app.runSchedule(UpdateTotalDataPath); + }); + it('should show empty upstream_registries when no changesStreamTasks', async () => { + const res = await app.httpRequest() + .get('/') + .expect(200) + .expect('content-type', 'application/json; charset=utf-8'); + const data = res.body; + assert(data.upstream_registries.length === 0); + }); + it('should show default registry', async () => { + // create default registry + await changesStreamService.findExecuteTask(); + + const tasks = await taskRepository.findTasksByCondition({ type: TaskType.ChangesStream }); + await changesStreamService.executeTask(tasks[0] as ChangesStreamTask); + assert(tasks.length === 1); + + assert(registryManagerService); + await app.runSchedule(UpdateTotalDataPath); + const res = await app.httpRequest() + .get('/') + .expect(200) + .expect('content-type', 'application/json; charset=utf-8'); + const data = res.body; + assert(data.upstream_registries.length === 1); + const [ upstream ] = data.upstream_registries; + assert(upstream.registry_name === 'default'); + assert(upstream.changes_stream_url === 'https://replicate.npmjs.com/_changes'); + assert(upstream.source_registry === 'https://registry.npmjs.org'); + }); + + it('should show custom registry', async () => { + // create default registry + await changesStreamService.findExecuteTask(); + + // create registry + const registry = await registryManagerService.createRegistry({ + name: 'custom', + changeStream: 'https://r.cnpmjs.org/_changes', + host: 'https://cnpmjs.org', + userPrefix: 'cnpm:', + type: RegistryType.Cnpmcore, + }); + await scopeManagerService.createScope({ name: '@cnpm', registryId: registry.registryId }); + await registryManagerService.createSyncChangesStream({ registryId: registry.registryId }); + + // start sync + const tasks = await taskRepository.findTasksByCondition({ type: TaskType.ChangesStream }); + assert(tasks.length === 2); + for (const task of tasks) { + await changesStreamService.executeTask(task as ChangesStreamTask); + } + + // refresh total + await app.runSchedule(UpdateTotalDataPath); + const res = await app.httpRequest() + .get('/') + .expect(200) + .expect('content-type', 'application/json; charset=utf-8'); + const data = res.body; + assert(data.upstream_registries.length === 2); + const [ defaultRegistry ] = data.upstream_registries.filter(item => item.registry_name === 'default'); + assert(defaultRegistry.registry_name === 'default'); + assert(defaultRegistry.changes_stream_url === 'https://replicate.npmjs.com/_changes'); + assert(defaultRegistry.source_registry === 'https://registry.npmjs.org'); + + const [ customRegistry ] = data.upstream_registries.filter(item => item.registry_name === 'custom'); + assert(customRegistry.registry_name === 'custom'); + assert(customRegistry.changes_stream_url === 'https://r.cnpmjs.org/_changes'); + assert(customRegistry.source_registry === 'https://cnpmjs.org'); + + }); + }); + }); }); diff --git a/test/repository/PackageRepository.test.ts b/test/repository/PackageRepository.test.ts new file mode 100644 index 00000000..a96ea089 --- /dev/null +++ b/test/repository/PackageRepository.test.ts @@ -0,0 +1,46 @@ +import assert from 'assert'; +import { app } from 'egg-mock/bootstrap'; +import { PackageRepository } from 'app/repository/PackageRepository'; +import { PackageManagerService } from 'app/core/service/PackageManagerService'; +import { UserService } from 'app/core/service/UserService'; + +describe('test/repository/PackageRepository.test.ts', () => { + let packageRepository: PackageRepository; + let packageManagerService: PackageManagerService; + let userService: UserService; + + describe('queryTotal', () => { + beforeEach(async () => { + packageRepository = await app.getEggObject(PackageRepository); + packageManagerService = await app.getEggObject(PackageManagerService); + userService = await app.getEggObject(UserService); + + }); + it('should work', async () => { + const { packageCount, packageVersionCount } = await packageRepository.queryTotal(); + const { user } = await userService.create({ + name: 'test-user', + password: 'this-is-password', + email: 'hello@example.com', + ip: '127.0.0.1', + }); + await packageManagerService.publish({ + dist: { + content: Buffer.alloc(0), + }, + tag: '', + scope: '', + name: 'foo', + description: 'foo description', + packageJson: {}, + readme: '', + version: '1.0.0', + isPrivate: true, + }, user); + const res = await packageRepository.queryTotal(); + // information_schema 只能返回大概值,仅验证增加 + assert(res.packageCount > packageCount); + assert(res.packageVersionCount > packageVersionCount); + }); + }); +});