Skip to content

Commit

Permalink
[NEW] Implement Queue service (#27531)
Browse files Browse the repository at this point in the history
  • Loading branch information
KevLehman authored and filipemarins committed Dec 29, 2022
1 parent 9e0eb7c commit af26190
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 9 deletions.
1 change: 1 addition & 0 deletions ee/apps/queue-worker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"fibers": "^5.0.3",
"mem": "^8.1.1",
"moleculer": "^0.14.21",
"mongo-message-queue": "^1.0.0",
"mongodb": "^4.12.1",
"nats": "^2.4.0",
"pino": "^8.4.2",
Expand Down
98 changes: 92 additions & 6 deletions ee/apps/queue-worker/src/QueueWorker.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,104 @@
import type { Db } from 'mongodb';
import type { ValidResult, Work } from 'mongo-message-queue';
import MessageQueue from 'mongo-message-queue';

import { ServiceClass } from '../../../../apps/meteor/server/sdk/types/ServiceClass';
import type { IQueueWorkerService } from '../../../../apps/meteor/server/sdk/types/IQueueWorkerService';
import type { IQueueWorkerService, HealthAggResult } from '../../../../apps/meteor/server/sdk/types/IQueueWorkerService';
import type { Logger } from '../../../../apps/meteor/server/lib/logger/Logger';

export class QueueWorker extends ServiceClass implements IQueueWorkerService {
protected name = 'queue-worker';

constructor() {
protected retryCount = 5;

protected queue: MessageQueue;

private logger: Logger;

constructor(private readonly db: Db, loggerClass: typeof Logger) {
super();

// your stuff
// eslint-disable-next-line new-cap
this.logger = new loggerClass('QueueWorker');
this.queue = new MessageQueue();
}

getConfig(): unknown {
return null;
isServiceNotFoundMessage(message: string): boolean {
return message.includes('is not found');
}

// more stuff
async created(): Promise<void> {
this.logger.info('Starting queue worker');
this.queue.databasePromise = () => {
return Promise.resolve(this.db);
};

try {
await this.registerWorkers();
} catch (e) {
this.logger.fatal(e, 'Fatal error occurred when registering workers');
process.exit(1);
}
}

async stopped(): Promise<void> {
this.logger.info('Stopping queue worker');
this.queue.stopPolling();
}

// Registers the actual workers, the actions lib will try to fetch elements to work on
private async registerWorkers(): Promise<void> {
this.logger.info('Registering workers of type "work"');
this.queue.registerWorker('work', (queueItem: Work<{ to: string; foo: string }>): Promise<ValidResult> => {
this.logger.info(`Processing queue item ${queueItem._id}`);
this.logger.info(`Queue item is trying to call ${queueItem.message.to}`);
return this.api
.waitAndCall(queueItem.message.to, queueItem.message)
.then(() => {
this.logger.info(`Queue item ${queueItem._id} completed`);
return 'Completed' as const;
})
.catch((err) => {
this.logger.error(`Queue item ${queueItem._id} errored: ${err.message}`);
queueItem.releasedReason = err.message;
// Let's only retry for X times when the error is "service not found"
// For any other error, we'll just reject the item
if ((queueItem.retryCount ?? 0) < this.retryCount && this.isServiceNotFoundMessage(err.message)) {
// Let's retry in 5 seconds
this.logger.info(`Queue item ${queueItem._id} will be retried in 5 seconds`);
queueItem.nextReceivableTime = new Date(Date.now() + 5000);
return 'Retry' as const;
}

this.logger.info(`Queue item ${queueItem._id} will be rejected`);
queueItem.rejectionReason = err.message;
return 'Rejected' as const;
});
});
}

// Queues an action of type "work" to be processed by the workers
// Action receives a record of unknown data that will be passed to the actual service
// `to` is a service name that will be called, including namespace + action
async queueWork<T extends Record<string, unknown>>(to: string, data: T): Promise<void> {
this.logger.info(`Queueing work for ${to}`);
await this.queue.enqueue<typeof data>('work', { to, ...data });
}

async queueInfo(): Promise<HealthAggResult[]> {
this.logger.info('Health check');
return this.db
.collection(this.queue.collectionName)
.aggregate<HealthAggResult>([
{
$addFields: {
status: { $cond: [{ $ifNull: ['$rejectionReason', false] }, 'Rejected', 'In progress'] },
},
},
{ $group: { _id: { type: '$type', status: '$status' }, elements: { $push: '$$ROOT' }, total: { $sum: 1 } } },
// Project from each group the type, status and total of elements
{ $project: { _id: 0, type: '$_id.type', status: '$_id.status', total: 1 } },
])
.toArray();
}
}
34 changes: 34 additions & 0 deletions ee/apps/queue-worker/src/externals/mongo-message-queue.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
declare module 'mongo-message-queue' {
import type { Db } from 'mongodb';

export type Work<T> = {
_id: string;
dateCreated: Date;
type: string;
message: T;
priority: number;
receivedTime: Date;
releasedReason?: string;
retryCount?: number;
nextReceivableTime?: Date;
rejectionReason?: string;
};

export type ValidResult = 'Completed' | 'Rejected' | 'Retry';

export type Actions = 'work';

export default class MessageQueue {
collectionName: string;

databasePromise: () => Promise<Db>;

registerWorker<T>(type: Actions, worker: (queueItem: Work<T>) => Promise<ValidResult>): void;

enqueue<T>(type: Actions, message: T, options?: { nextReceivableTime: Date; priority: number }): Promise<void>;

enqueueAndProcess<T>(type: Actions, message: T, options?: { nextReceivableTime: Date; priority: number }): Promise<void>;

stopPolling(): void;
}
}
3 changes: 2 additions & 1 deletion ee/apps/queue-worker/src/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { api } from '../../../../apps/meteor/server/sdk/api';
import { broker } from '../../../../apps/meteor/ee/server/startup/broker';
import { Collections, getCollection, getConnection } from '../../../../apps/meteor/ee/server/services/mongo';
import { registerServiceModels } from '../../../../apps/meteor/ee/server/lib/registerServiceModels';
import { Logger } from '../../../../apps/meteor/server/lib/logger/Logger';

const PORT = process.env.PORT || 3038;

Expand All @@ -20,7 +21,7 @@ const PORT = process.env.PORT || 3038;
// need to import service after models are registeredpackagfe
const { QueueWorker } = await import('./QueueWorker');

api.registerService(new QueueWorker());
api.registerService(new QueueWorker(db, Logger));

await api.start();

Expand Down
2 changes: 1 addition & 1 deletion ee/apps/queue-worker/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@
"declarationMap": false
},
"files": ["./src/service.ts"],
"include": ["../../../apps/meteor/definition/externals/meteor"],
"include": ["../../../apps/meteor/definition/externals/meteor", "./src/externals"],
"exclude": ["./dist"]
}
9 changes: 8 additions & 1 deletion packages/core-services/src/types/IQueueWorkerService.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
export type HealthAggResult = {
total: number;
type: string;
status: 'Rejected' | 'In progress';
};

export interface IQueueWorkerService {
getConfig(): unknown;
queueWork<T extends Record<string, unknown>>(to: string, data: T): Promise<void>;
queueInfo(): Promise<HealthAggResult[]>;
}

0 comments on commit af26190

Please sign in to comment.