From f692bbdd65b76f205b12c1c7bdc0d7eba5a4bc39 Mon Sep 17 00:00:00 2001 From: Nicholas Griffin Date: Mon, 10 Apr 2023 17:05:19 +0100 Subject: [PATCH] feat: adding the ability to update visibilityTimeout (#379) * feat: adding the ability to update options * chore: updating README * chore: formatting * chore: updating validation * chore: adding better test validation * chore: breaking option update into its own func * chore: formatting --- README.md | 6 +++ src/consumer.ts | 48 +++++++++++++++++++++- src/types.ts | 6 +++ test/tests/consumer.test.ts | 81 ++++++++++++++++++++++++++++++++++--- 4 files changed, 135 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index c1c82e24..17eac30a 100644 --- a/README.md +++ b/README.md @@ -127,6 +127,12 @@ By default, the value of `abort` is set to `false` which means pre existing requ Returns the current polling state of the consumer: `true` if it is actively polling, `false` if it is not. +### `consumer.updateOption(option, value)` + +Updates the provided option with the provided value. + +You can [find out more about this here](https://bbc.github.io/sqs-consumer/public/classes/Consumer.html#updateOption). + ### Events Each consumer is an [`EventEmitter`](https://nodejs.org/api/events.html) and [emits these events](https://bbc.github.io/sqs-consumer/interfaces/Events.html). diff --git a/src/consumer.ts b/src/consumer.ts index 7d61ee66..6b0c786d 100644 --- a/src/consumer.ts +++ b/src/consumer.ts @@ -17,7 +17,12 @@ import { } from '@aws-sdk/client-sqs'; import Debug from 'debug'; -import { ConsumerOptions, TypedEventEmitter, StopOptions } from './types'; +import { + ConsumerOptions, + TypedEventEmitter, + StopOptions, + UpdatableOptions +} from './types'; import { autoBind } from './bind'; import { SQSError, @@ -132,6 +137,47 @@ export class Consumer extends TypedEventEmitter { return !this.stopped; } + /** + * Updates visibilityTimeout to the provided value. + * @param value The value to set visibilityTimeout to + */ + private updateVisibilityTimeout(value: ConsumerOptions['visibilityTimeout']) { + if (typeof value !== 'number') { + throw new Error('visibilityTimeout must be a number'); + } + + if ( + typeof value !== 'number' || + (this.heartbeatInterval && value <= this.heartbeatInterval) + ) { + throw new Error('heartbeatInterval must be less than visibilityTimeout.'); + } + + debug(`Updating the visibilityTimeout option to the value ${value}`); + + this.visibilityTimeout = value; + + this.emit('option_updated', 'visibilityTimeout', value); + } + + /** + * Updates the provided option to the provided value. + * @param option The option that you want to update + * @param value The value to set the option to + */ + public updateOption( + option: UpdatableOptions, + value: ConsumerOptions[UpdatableOptions] + ) { + switch (option) { + case 'visibilityTimeout': + this.updateVisibilityTimeout(value); + break; + default: + throw new Error(`The update ${option} cannot be updated`); + } + } + /** * Emit one of the consumer's error events depending on the error received. * @param err The error object to forward on diff --git a/src/types.ts b/src/types.ts index 350ada8c..8a951cea 100644 --- a/src/types.ts +++ b/src/types.ts @@ -106,6 +106,8 @@ export interface ConsumerOptions { handleMessageBatch?(messages: Message[]): Promise; } +export type UpdatableOptions = 'visibilityTimeout'; + export interface StopOptions { /** * Default to `false`, if you want the stop action to also abort requests to SQS @@ -155,6 +157,10 @@ export interface Events { * Fired when the consumer finally stops its work. */ stopped: []; + /** + * Fired when an option is updated + */ + option_updated: [UpdatableOptions, ConsumerOptions[UpdatableOptions]]; } export class TypedEventEmitter extends EventEmitter { diff --git a/test/tests/consumer.test.ts b/test/tests/consumer.test.ts index 087e1414..693fb2fd 100644 --- a/test/tests/consumer.test.ts +++ b/test/tests/consumer.test.ts @@ -101,7 +101,7 @@ describe('Consumer', () => { region: REGION, queueUrl: QUEUE_URL }); - }); + }, `Missing SQS consumer option [ handleMessage or handleMessageBatch ].`); }); it('requires the batchSize option to be no greater than 10', () => { @@ -112,7 +112,7 @@ describe('Consumer', () => { handleMessage, batchSize: 11 }); - }); + }, 'SQS batchSize option must be between 1 and 10.'); }); it('requires the batchSize option to be greater than 0', () => { @@ -123,7 +123,7 @@ describe('Consumer', () => { handleMessage, batchSize: -1 }); - }); + }, 'SQS batchSize option must be between 1 and 10.'); }); it('requires visibilityTimeout to be set with heartbeatInterval', () => { @@ -134,7 +134,7 @@ describe('Consumer', () => { handleMessage, heartbeatInterval: 30 }); - }); + }, 'heartbeatInterval must be less than visibilityTimeout.'); }); it('requires heartbeatInterval to be less than visibilityTimeout', () => { @@ -146,7 +146,7 @@ describe('Consumer', () => { heartbeatInterval: 30, visibilityTimeout: 30 }); - }); + }, 'heartbeatInterval must be less than visibilityTimeout.'); }); describe('.create', () => { @@ -1175,6 +1175,77 @@ describe('Consumer', () => { }); }); + describe('updateOption', async () => { + it('updates the visibilityTimeout option and emits an event', () => { + const optionUpdatedListener = sandbox.stub(); + consumer.on('option_updated', optionUpdatedListener); + + consumer.updateOption('visibilityTimeout', 45); + + assert.equal(consumer.visibilityTimeout, 45); + + sandbox.assert.calledWithMatch( + optionUpdatedListener, + 'visibilityTimeout', + 45 + ); + }); + + it('does not update the visibilityTimeout if the value is not a number', () => { + consumer = new Consumer({ + region: REGION, + queueUrl: QUEUE_URL, + handleMessage, + visibilityTimeout: 60 + }); + + const optionUpdatedListener = sandbox.stub(); + consumer.on('option_updated', optionUpdatedListener); + + assert.throws(() => { + consumer.updateOption('visibilityTimeout', 'value'); + }, 'visibilityTimeout must be a number'); + + assert.equal(consumer.visibilityTimeout, 60); + + sandbox.assert.notCalled(optionUpdatedListener); + }); + + it('does not update the visibilityTimeout if the value is less than the heartbeatInterval', () => { + consumer = new Consumer({ + region: REGION, + queueUrl: QUEUE_URL, + handleMessage, + heartbeatInterval: 30, + visibilityTimeout: 60 + }); + + const optionUpdatedListener = sandbox.stub(); + consumer.on('option_updated', optionUpdatedListener); + + assert.throws(() => { + consumer.updateOption('visibilityTimeout', 30); + }, 'heartbeatInterval must be less than visibilityTimeout.'); + + assert.equal(consumer.visibilityTimeout, 60); + + sandbox.assert.notCalled(optionUpdatedListener); + }); + + it('throws an error for an unknown option', () => { + consumer = new Consumer({ + region: REGION, + queueUrl: QUEUE_URL, + handleMessage, + visibilityTimeout: 60 + }); + + assert.throws(() => { + consumer.updateOption('unknown', 'value'); + }, `The update unknown cannot be updated`); + }); + }); + describe('delete messages property', () => { beforeEach(() => { consumer = new Consumer({