Skip to content

flocasts/nestjs-sqs-microservice

Repository files navigation

NestJS SQS Microservice

This is a fully featured NestJS microservice wrapper for sqs-consumer, leveraging the wonderful feature set of the NestJS Microservice ecosystem. This means you get a seamless, controller based experience for interacting with SQS queues!

Features

Decorators

Name Type Description Options Batch Compatible
@SqsMessageHandler Method Decorator Marks a Controller method as an SQS Message Handler ConsumerOptions
@SqsMessageBody Property Decorator Extracts the message body from the SQS message Body key (optional)
@SqsMessageId Property Decorator Extracts the message ID from the SQS message
@SqsAttributes Property Decorator Extracts attributes from the SQS message Attribute Key (optional)
@SqsMessageAttributes Property Decorator Extracts message attributes from the SQS message Message Attribute Key (optional)
@SqsBatchMessages Property Decorator Extracts messages from an SQS Batch message

Getting Started

Setting up the Microservice

Hybrid Applications

In most cases you'll end up going with a hybrid application, meaning you'll use both HTTP and Microservice handlers, here's how you do that:

import { NestFactory } from '@nestjs/core';
import { TestModule } from './test.module.js';
import { AwsSQSServer } from '../index.js';

async function bootstrap() {
    const app = await NestFactory.create(TestModule);
    app.connectMicroservice({
        strategy: new AwsSQSServer(),
    });

    await app.startAllMicroservices();
    await app.listen(3000);
}
bootstrap();

Check here for the full file

Standalone Microservice Applications

Sometimes you do just need a microservice though, and here's what that looks like:

import { NestFactory } from '@nestjs/core';
import { TestModule } from './test.module.js';
import { AwsSQSServer } from '../index.js';

async function bootstrap() {
    const app = await NestFactory.createMicroservice(TestModule, {
        strategy: new AwsSQSServer(),
    });

    await app.listen();
}
bootstrap();

Check here for the full file

Setting up Controllers

Here's the controller from our tests (check here for the full thing!):

@Controller()
export class TestSqsController {
    private readonly logger = new Logger(TestSqsController.name);

    @SqsMessageHandler({
        queueUrl: process.env.SQS_QUEUE_URL!,
        attributeNames: ['All'],
    })
    public async messageHandler(
        // This will extract the message body from the SQS message
        @SqsMessageBody(new ZodValidationPipe(testSchema)) body: TestSchema,
        // This will extract the message ID from the SQS message
        @SqsMessageId() messageId: string,
        // This will extract the attributes  from the SQS message
        @SqsAttributes() attributes: unknown,
        @SqsAttributes('SenderId') senderId: unknown,
        // This will extract the message attributes from the SQS message
        @SqsMessageAttributes() messageAttributes: unknown,
    ): Promise<void> {
        this.logger.log({msg: 'A message was received!', messageId, messageAttributes, attributes, body, senderId });
    }

We also support the batch syntax, as used by sqs-consumer:

@Controller()
export class TestSqsController {
    private readonly logger = new Logger(TestSqsController.name);

    @SqsMessageHandler({
        queueUrl: process.env.SQS_QUEUE_URL!,
        batch: true,
    })
    public async batchMessageHandler(@SqsBatchMessages() messages: Message[]): Promise<void> {
        this.logger.log({ msg: 'A batch message was received!', messages });
    }
}

N.B. Each queue url can only have one message handler, regardless of whether is is batched. After the initial handler is detected, any subsequent handlers will result in an error.

Queue Events

In addition to messages, we also expose other events available from the sqs-consumer library, here's an example.

@Controller()
export class TestSqsController {
    private readonly logger = new Logger(TestSqsController.name);

    @SqsMessageHandler({
        queueUrl: process.env.SQS_QUEUE_URL!,
        attributeNames: ['All'],
    })
    public async messageHandler(): Promise<void> {
        this.logger.log({ msg: 'A message was received!' });
    }

    @SqsQueueEventHandler({
        queueUrl: process.env.SQS_QUEUE_URL!,
        event: 'error',
    })
    public async eventHandler([error, message]: Events['error']): Promise<void> {
        this.logger.log({ msg: 'An event was received!', error, message });
    }
}

N.B. Only one handler for each event can be applied per queue.