diff --git a/packages/job-queue-plugin/src/pub-sub/pub-sub-job-queue-strategy.ts b/packages/job-queue-plugin/src/pub-sub/pub-sub-job-queue-strategy.ts index e5743dc4af..06e0056680 100644 --- a/packages/job-queue-plugin/src/pub-sub/pub-sub-job-queue-strategy.ts +++ b/packages/job-queue-plugin/src/pub-sub/pub-sub-job-queue-strategy.ts @@ -71,8 +71,9 @@ export class PubSubJobQueueStrategy extends InjectableJobQueueStrategy implement } const subscription = this.subscription(queueName); - const listener = (message: Message) => { - Logger.debug(`Received message: ${queueName}: ${message.id}`, loggerCtx); + + const processMessage = async (message: Message) => { + Logger.verbose(`Received message: ${queueName}: ${message.id}`, loggerCtx); const job = new Job({ id: message.id, @@ -84,12 +85,21 @@ export class PubSubJobQueueStrategy extends InjectableJobQueueStrategy implement createdAt: message.publishTime, }); - process(job) + await process(job); + }; + + const listener = (message: Message) => { + processMessage(message) .then(() => { message.ack(); + Logger.verbose(`Finished handling: ${queueName}: ${message.id}`, loggerCtx); }) .catch(err => { message.nack(); + Logger.error( + `Error handling: ${queueName}: ${message.id}: ${String(err.message)}`, + loggerCtx, + ); }); }; this.listeners.set(queueName, process, listener);