Skip to content

Commit

Permalink
refactor(server): job discorvery
Browse files Browse the repository at this point in the history
  • Loading branch information
jrasm91 committed Oct 31, 2024
1 parent 1602767 commit c055689
Show file tree
Hide file tree
Showing 32 changed files with 320 additions and 374 deletions.
8 changes: 8 additions & 0 deletions server/src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { controllers } from 'src/controllers';
import { entities } from 'src/entities';
import { ImmichWorker } from 'src/enum';
import { IEventRepository } from 'src/interfaces/event.interface';
import { IJobRepository } from 'src/interfaces/job.interface';
import { ILoggerRepository } from 'src/interfaces/logger.interface';
import { ITelemetryRepository } from 'src/interfaces/telemetry.interface';
import { AuthGuard } from 'src/middleware/auth.guard';
Expand Down Expand Up @@ -64,6 +65,7 @@ abstract class BaseModule implements OnModuleInit, OnModuleDestroy {
constructor(
@Inject(ILoggerRepository) logger: ILoggerRepository,
@Inject(IEventRepository) private eventRepository: IEventRepository,
@Inject(IJobRepository) private jobRepository: IJobRepository,
@Inject(ITelemetryRepository) private telemetryRepository: ITelemetryRepository,
) {
logger.setAppName(this.worker);
Expand All @@ -73,6 +75,12 @@ abstract class BaseModule implements OnModuleInit, OnModuleDestroy {

async onModuleInit() {
this.telemetryRepository.setup({ repositories: repositories.map(({ useClass }) => useClass) });

this.jobRepository.setup({ services });
if (this.worker === ImmichWorker.MICROSERVICES) {
this.jobRepository.startWorkers();
}

this.eventRepository.setup({ services });
await this.eventRepository.emit('app.bootstrap', this.worker);
}
Expand Down
7 changes: 7 additions & 0 deletions server/src/decorators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import _ from 'lodash';
import { ADDED_IN_PREFIX, DEPRECATED_IN_PREFIX, LIFECYCLE_EXTENSION } from 'src/constants';
import { MetadataKey } from 'src/enum';
import { EmitEvent } from 'src/interfaces/event.interface';
import { JobName, QueueName } from 'src/interfaces/job.interface';
import { setUnion } from 'src/utils/set';

// PostgreSQL uses a 16-bit integer to indicate the number of bound parameters. This means that the
Expand Down Expand Up @@ -122,6 +123,12 @@ export type EventConfig = {
};
export const OnEvent = (config: EventConfig) => SetMetadata(MetadataKey.EVENT_CONFIG, config);

export type JobConfig = {
name: JobName;
queue: QueueName;
};
export const OnJob = (config: JobConfig) => SetMetadata(MetadataKey.JOB_CONFIG, config);

type LifecycleRelease = 'NEXT_RELEASE' | string;
type LifecycleMetadata = {
addedAt?: LifecycleRelease;
Expand Down
1 change: 1 addition & 0 deletions server/src/enum.ts
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ export enum MetadataKey {
SHARED_ROUTE = 'shared_route',
API_KEY_SECURITY = 'api_key',
EVENT_CONFIG = 'event_config',
JOB_CONFIG = 'job_config',
TELEMETRY_ENABLED = 'telemetry_enabled',
}

Expand Down
3 changes: 3 additions & 0 deletions server/src/interfaces/event.interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { SystemConfig } from 'src/config';
import { AssetResponseDto } from 'src/dtos/asset-response.dto';
import { ReleaseNotification, ServerVersionResponseDto } from 'src/dtos/server.dto';
import { ImmichWorker } from 'src/enum';
import { JobItem, QueueName } from 'src/interfaces/job.interface';

export const IEventRepository = 'IEventRepository';

Expand Down Expand Up @@ -38,6 +39,8 @@ type EventMap = {
'assets.delete': [{ assetIds: string[]; userId: string }];
'assets.restore': [{ assetIds: string[]; userId: string }];

'job.start': [QueueName, JobItem];

// session events
'session.delete': [{ sessionId: string }];

Expand Down
16 changes: 9 additions & 7 deletions server/src/interfaces/job.interface.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { ClassConstructor } from 'class-transformer';
import { EmailImageAttachment } from 'src/interfaces/notification.interface';

export enum QueueName {
Expand Down Expand Up @@ -238,8 +239,8 @@ export type JobItem =

// Migration
| { name: JobName.QUEUE_MIGRATION; data?: IBaseJob }
| { name: JobName.MIGRATE_ASSET; data?: IEntityJob }
| { name: JobName.MIGRATE_PERSON; data?: IEntityJob }
| { name: JobName.MIGRATE_ASSET; data: IEntityJob }
| { name: JobName.MIGRATE_PERSON; data: IEntityJob }

// Metadata Extraction
| { name: JobName.QUEUE_METADATA_EXTRACTION; data: IBaseJob }
Expand Down Expand Up @@ -286,7 +287,7 @@ export type JobItem =
| { name: JobName.LIBRARY_SYNC_FILE; data: ILibraryFileJob }
| { name: JobName.LIBRARY_QUEUE_SYNC_FILES; data: IEntityJob }
| { name: JobName.LIBRARY_QUEUE_SYNC_ASSETS; data: IEntityJob }
| { name: JobName.LIBRARY_SYNC_ASSET; data: IEntityJob }
| { name: JobName.LIBRARY_SYNC_ASSET; data: ILibraryAssetJob }
| { name: JobName.LIBRARY_DELETE; data: IEntityJob }
| { name: JobName.LIBRARY_QUEUE_SYNC_ALL; data?: IBaseJob }
| { name: JobName.LIBRARY_QUEUE_CLEANUP; data: IBaseJob }
Expand All @@ -305,14 +306,15 @@ export enum JobStatus {
FAILED = 'failed',
SKIPPED = 'skipped',
}

export type JobHandler<T = any> = (data: T) => Promise<JobStatus>;
export type JobItemHandler = (item: JobItem) => Promise<void>;
export type Jobs = { [K in JobItem['name']]: (JobItem & { name: K })['data'] };
export type JobOf<T extends JobName> = Jobs[T];

export const IJobRepository = 'IJobRepository';

export interface IJobRepository {
addHandler(queueName: QueueName, concurrency: number, handler: JobItemHandler): void;
setup(options: { services: ClassConstructor<unknown>[] }): void;
startWorkers(): void;
run(job: JobItem): Promise<JobStatus>;
addCronJob(name: string, expression: string, onTick: () => void, start?: boolean): void;
updateCronJob(name: string, expression?: string, start?: boolean): void;
setConcurrency(queueName: QueueName, concurrency: number): void;
Expand Down
190 changes: 96 additions & 94 deletions server/src/repositories/job.repository.ts
Original file line number Diff line number Diff line change
@@ -1,124 +1,122 @@
import { getQueueToken } from '@nestjs/bullmq';
import { Inject, Injectable } from '@nestjs/common';
import { ModuleRef } from '@nestjs/core';
import { ModuleRef, Reflector } from '@nestjs/core';
import { SchedulerRegistry } from '@nestjs/schedule';
import { Job, JobsOptions, Processor, Queue, Worker, WorkerOptions } from 'bullmq';
import { JobsOptions, Queue, Worker } from 'bullmq';
import { ClassConstructor } from 'class-transformer';
import { CronJob, CronTime } from 'cron';
import { setTimeout } from 'node:timers/promises';
import { JobConfig } from 'src/decorators';
import { MetadataKey } from 'src/enum';
import { IConfigRepository } from 'src/interfaces/config.interface';
import { IEventRepository } from 'src/interfaces/event.interface';
import {
IEntityJob,
IJobRepository,
JobCounts,
JobItem,
JobName,
JobOf,
JobStatus,
QueueCleanType,
QueueName,
QueueStatus,
} from 'src/interfaces/job.interface';
import { ILoggerRepository } from 'src/interfaces/logger.interface';
import { getKeyByValue, getMethodNames, ImmichStartupError } from 'src/utils/misc';

export const JOBS_TO_QUEUE: Record<JobName, QueueName> = {
// misc
[JobName.ASSET_DELETION]: QueueName.BACKGROUND_TASK,
[JobName.ASSET_DELETION_CHECK]: QueueName.BACKGROUND_TASK,
[JobName.USER_DELETE_CHECK]: QueueName.BACKGROUND_TASK,
[JobName.USER_DELETION]: QueueName.BACKGROUND_TASK,
[JobName.DELETE_FILES]: QueueName.BACKGROUND_TASK,
[JobName.CLEAN_OLD_AUDIT_LOGS]: QueueName.BACKGROUND_TASK,
[JobName.CLEAN_OLD_SESSION_TOKENS]: QueueName.BACKGROUND_TASK,
[JobName.PERSON_CLEANUP]: QueueName.BACKGROUND_TASK,
[JobName.USER_SYNC_USAGE]: QueueName.BACKGROUND_TASK,

// backups
[JobName.BACKUP_DATABASE]: QueueName.BACKUP_DATABASE,

// conversion
[JobName.QUEUE_VIDEO_CONVERSION]: QueueName.VIDEO_CONVERSION,
[JobName.VIDEO_CONVERSION]: QueueName.VIDEO_CONVERSION,

// thumbnails
[JobName.QUEUE_GENERATE_THUMBNAILS]: QueueName.THUMBNAIL_GENERATION,
[JobName.GENERATE_THUMBNAILS]: QueueName.THUMBNAIL_GENERATION,
[JobName.GENERATE_PERSON_THUMBNAIL]: QueueName.THUMBNAIL_GENERATION,

// tags
[JobName.TAG_CLEANUP]: QueueName.BACKGROUND_TASK,

// metadata
[JobName.QUEUE_METADATA_EXTRACTION]: QueueName.METADATA_EXTRACTION,
[JobName.METADATA_EXTRACTION]: QueueName.METADATA_EXTRACTION,
[JobName.LINK_LIVE_PHOTOS]: QueueName.METADATA_EXTRACTION,

// storage template
[JobName.STORAGE_TEMPLATE_MIGRATION]: QueueName.STORAGE_TEMPLATE_MIGRATION,
[JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE]: QueueName.STORAGE_TEMPLATE_MIGRATION,

// migration
[JobName.QUEUE_MIGRATION]: QueueName.MIGRATION,
[JobName.MIGRATE_ASSET]: QueueName.MIGRATION,
[JobName.MIGRATE_PERSON]: QueueName.MIGRATION,

// facial recognition
[JobName.QUEUE_FACE_DETECTION]: QueueName.FACE_DETECTION,
[JobName.FACE_DETECTION]: QueueName.FACE_DETECTION,
[JobName.QUEUE_FACIAL_RECOGNITION]: QueueName.FACIAL_RECOGNITION,
[JobName.FACIAL_RECOGNITION]: QueueName.FACIAL_RECOGNITION,

// smart search
[JobName.QUEUE_SMART_SEARCH]: QueueName.SMART_SEARCH,
[JobName.SMART_SEARCH]: QueueName.SMART_SEARCH,

// duplicate detection
[JobName.QUEUE_DUPLICATE_DETECTION]: QueueName.DUPLICATE_DETECTION,
[JobName.DUPLICATE_DETECTION]: QueueName.DUPLICATE_DETECTION,

// XMP sidecars
[JobName.QUEUE_SIDECAR]: QueueName.SIDECAR,
[JobName.SIDECAR_DISCOVERY]: QueueName.SIDECAR,
[JobName.SIDECAR_SYNC]: QueueName.SIDECAR,
[JobName.SIDECAR_WRITE]: QueueName.SIDECAR,

// Library management
[JobName.LIBRARY_SYNC_FILE]: QueueName.LIBRARY,
[JobName.LIBRARY_QUEUE_SYNC_FILES]: QueueName.LIBRARY,
[JobName.LIBRARY_QUEUE_SYNC_ASSETS]: QueueName.LIBRARY,
[JobName.LIBRARY_DELETE]: QueueName.LIBRARY,
[JobName.LIBRARY_SYNC_ASSET]: QueueName.LIBRARY,
[JobName.LIBRARY_QUEUE_SYNC_ALL]: QueueName.LIBRARY,
[JobName.LIBRARY_QUEUE_CLEANUP]: QueueName.LIBRARY,

// Notification
[JobName.SEND_EMAIL]: QueueName.NOTIFICATION,
[JobName.NOTIFY_ALBUM_INVITE]: QueueName.NOTIFICATION,
[JobName.NOTIFY_ALBUM_UPDATE]: QueueName.NOTIFICATION,
[JobName.NOTIFY_SIGNUP]: QueueName.NOTIFICATION,

// Version check
[JobName.VERSION_CHECK]: QueueName.BACKGROUND_TASK,

// Trash
[JobName.QUEUE_TRASH_EMPTY]: QueueName.BACKGROUND_TASK,
type JobMapItem = {
jobName: JobName;
queueName: QueueName;
handler: (job: JobOf<any>) => Promise<JobStatus>;
label: string;
};

@Injectable()
export class JobRepository implements IJobRepository {
private workers: Partial<Record<QueueName, Worker>> = {};
private handlers: Partial<Record<JobName, JobMapItem>> = {};

constructor(
private moduleReference: ModuleRef,
private schedulerReqistry: SchedulerRegistry,
private moduleRef: ModuleRef,
private schedulerRegistry: SchedulerRegistry,
@Inject(IConfigRepository) private configRepository: IConfigRepository,
@Inject(IEventRepository) private eventRepository: IEventRepository,
@Inject(ILoggerRepository) private logger: ILoggerRepository,
) {
this.logger.setContext(JobRepository.name);
}

addHandler(queueName: QueueName, concurrency: number, handler: (item: JobItem) => Promise<void>) {
setup({ services }: { services: ClassConstructor<unknown>[] }) {
const reflector = this.moduleRef.get(Reflector, { strict: false });

// discovery
for (const Service of services) {
const instance = this.moduleRef.get<any>(Service);
for (const methodName of getMethodNames(instance)) {
const handler = instance[methodName];
const config = reflector.get<JobConfig>(MetadataKey.JOB_CONFIG, handler);
if (!config) {
continue;
}

const { name: jobName, queue: queueName } = config;
const label = `${Service.name}.${handler.name}`;

// one handler per job
if (this.handlers[jobName]) {
const jobKey = getKeyByValue(JobName, jobName);
const errorMessage = `Failed to add job handler for ${label}`;
this.logger.error(
`${errorMessage}. JobName.${jobKey} is already handled by ${this.handlers[jobName].label}.`,
);
throw new ImmichStartupError(errorMessage);
}

this.handlers[jobName] = {
label,
jobName,
queueName,
handler: handler.bind(instance),
};

this.logger.verbose(`Added job handler: ${jobName} => ${label}`);
}
}

// no missing handlers
for (const [jobKey, jobName] of Object.entries(JobName)) {
const item = this.handlers[jobName];
if (!item) {
const errorMessage = `Failed to find job handler for Job.${jobKey} ("${jobName}")`;
this.logger.error(
`${errorMessage}. Make sure to add the @OnJob({ name: JobName.${jobKey}, queue: QueueName.XYZ }) decorator for the new job.`,
);
throw new ImmichStartupError(errorMessage);
}
}
}

startWorkers() {
const { bull } = this.configRepository.getEnv();
const workerHandler: Processor = async (job: Job) => handler(job as JobItem);
const workerOptions: WorkerOptions = { ...bull.config, concurrency };
this.workers[queueName] = new Worker(queueName, workerHandler, workerOptions);
for (const queueName of Object.values(QueueName)) {
this.logger.debug(`Starting worker for queue: ${queueName}`);
this.workers[queueName] = new Worker(
queueName,
(job) => this.eventRepository.emit('job.start', queueName, job as JobItem),
{ ...bull.config, concurrency: 1 },
);
}
}

async run({ name, data }: JobItem) {
const item = this.handlers[name as JobName];
if (!item) {
this.logger.warn(`Skipping unknown job: "${name}"`);
return JobStatus.SKIPPED;
}

return item.handler(data);
}

addCronJob(name: string, expression: string, onTick: () => void, start = true): void {
Expand All @@ -141,11 +139,11 @@ export class JobRepository implements IJobRepository {
true,
);

this.schedulerReqistry.addCronJob(name, job);
this.schedulerRegistry.addCronJob(name, job);
}

updateCronJob(name: string, expression?: string, start?: boolean): void {
const job = this.schedulerReqistry.getCronJob(name);
const job = this.schedulerRegistry.getCronJob(name);
if (expression) {
job.setTime(new CronTime(expression));
}
Expand Down Expand Up @@ -204,6 +202,10 @@ export class JobRepository implements IJobRepository {
) as unknown as Promise<JobCounts>;
}

private getQueueName(name: JobName) {
return (this.handlers[name] as JobMapItem).queueName;
}

async queueAll(items: JobItem[]): Promise<void> {
if (items.length === 0) {
return;
Expand All @@ -212,7 +214,7 @@ export class JobRepository implements IJobRepository {
const promises = [];
const itemsByQueue = {} as Record<string, (JobItem & { data: any; options: JobsOptions | undefined })[]>;
for (const item of items) {
const queueName = JOBS_TO_QUEUE[item.name];
const queueName = this.getQueueName(item.name);
const job = {
name: item.name,
data: item.data || {},
Expand Down Expand Up @@ -273,11 +275,11 @@ export class JobRepository implements IJobRepository {
}

private getQueue(queue: QueueName): Queue {
return this.moduleReference.get<Queue>(getQueueToken(queue), { strict: false });
return this.moduleRef.get<Queue>(getQueueToken(queue), { strict: false });
}

public async removeJob(jobId: string, name: JobName): Promise<IEntityJob | undefined> {
const existingJob = await this.getQueue(JOBS_TO_QUEUE[name]).getJob(jobId);
const existingJob = await this.getQueue(this.getQueueName(name)).getJob(jobId);
if (!existingJob) {
return;
}
Expand Down
Loading

0 comments on commit c055689

Please sign in to comment.