diff --git a/e2e/module.e2e-spec.ts b/e2e/module.e2e-spec.ts index fb97c8c..2b57cb5 100644 --- a/e2e/module.e2e-spec.ts +++ b/e2e/module.e2e-spec.ts @@ -66,6 +66,29 @@ describe('SqsModule', () => { expect(sqsService.options.consumers).toHaveLength(1); expect(sqsService.options.producers).toHaveLength(1); }); + + it('should register module async with globalOptions', async () => { + module = await Test.createTestingModule({ + imports: [ + SqsModule.registerAsync({ + useFactory: async () => { + return { + globalOptions: { + endpoint: SQS_ENDPOINT, + }, + consumers: [TestQueues[TestQueue.Test]], + producers: [TestQueues[TestQueue.Test]], + }; + }, + }), + ], + }).compile(); + + const sqsService = module.get(SqsService); + expect(sqsService).toBeTruthy(); + expect(sqsService.options.consumers).toHaveLength(1); + expect(sqsService.options.producers).toHaveLength(1); + }); }); describe('full flow', () => { diff --git a/lib/sqs.service.ts b/lib/sqs.service.ts index 6701c94..a53a8e5 100644 --- a/lib/sqs.service.ts +++ b/lib/sqs.service.ts @@ -3,6 +3,7 @@ import { Consumer, StopOptions } from 'sqs-consumer'; import { Producer } from 'sqs-producer'; import { SQSClient, GetQueueAttributesCommand, PurgeQueueCommand, QueueAttributeName } from '@aws-sdk/client-sqs'; import { + GlobalOptions, Message, QueueName, SqsConsumerEventHandlerMeta, @@ -19,6 +20,7 @@ export class SqsService implements OnModuleInit, OnModuleDestroy { public readonly producers = new Map(); private logger: LoggerService; + private globalOptions: GlobalOptions; private globalStopOptions: StopOptions; public constructor( @@ -28,6 +30,7 @@ export class SqsService implements OnModuleInit, OnModuleDestroy { public async onModuleInit(): Promise { this.logger = this.options.logger ?? new Logger('SqsService', { timestamp: false }); + this.globalOptions = this.options.globalOptions ?? {}; this.globalStopOptions = this.options.globalStopOptions ?? {}; const messageHandlers = await this.discover.providerMethodsWithMetaAtKey( @@ -49,9 +52,15 @@ export class SqsService implements OnModuleInit, OnModuleDestroy { return; } + const shouldUseGlobalOptionsEndpoint = this.globalOptions.endpoint && !consumerOptions.sqs; const isBatchHandler = metadata.meta.batch === true; const consumer = Consumer.create({ ...consumerOptions, + ...(shouldUseGlobalOptionsEndpoint && { + sqs: new SQSClient({ + endpoint: this.globalOptions.endpoint, + }), + }), ...(isBatchHandler ? { handleMessageBatch: metadata.discoveredMethod.handler.bind( @@ -79,7 +88,15 @@ export class SqsService implements OnModuleInit, OnModuleDestroy { throw new Error(`Producer already exists: ${name}`); } - const producer = Producer.create(producerOptions); + const shouldUseGlobalOptionsEndpoint = this.globalOptions.endpoint && !producerOptions.sqs; + const producer = Producer.create({ + ...producerOptions, + ...(shouldUseGlobalOptionsEndpoint && { + sqs: new SQSClient({ + endpoint: this.globalOptions.endpoint, + }), + }), + }); this.producers.set(name, producer); }); diff --git a/lib/sqs.types.ts b/lib/sqs.types.ts index 23034b5..531279b 100644 --- a/lib/sqs.types.ts +++ b/lib/sqs.types.ts @@ -20,7 +20,12 @@ export type SqsProducerOptions = ProducerOptions & { name: QueueName; }; +export type GlobalOptions = { + endpoint?: string; +}; + export interface SqsOptions { + globalOptions?: GlobalOptions; consumers?: SqsConsumerOptions[]; producers?: SqsProducerOptions[]; logger?: LoggerService;