Skip to content

Commit

Permalink
feat: adding the ability to update visibilityTimeout (bbc#379)
Browse files Browse the repository at this point in the history
* feat: adding the ability to update options

* chore: updating README

* chore: formatting

* chore: updating validation

* chore: adding better test validation

* chore: breaking option update into its own func

* chore: formatting
  • Loading branch information
nicholasgriffintn authored Apr 10, 2023
1 parent ec37ba1 commit f692bbd
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 6 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@ By default, the value of `abort` is set to `false` which means pre existing requ
Returns the current polling state of the consumer: `true` if it is actively polling, `false` if it is not.
### `consumer.updateOption(option, value)`
Updates the provided option with the provided value.
You can [find out more about this here](https://bbc.github.io/sqs-consumer/public/classes/Consumer.html#updateOption).
### Events
Each consumer is an [`EventEmitter`](https://nodejs.org/api/events.html) and [emits these events](https://bbc.github.io/sqs-consumer/interfaces/Events.html).
Expand Down
48 changes: 47 additions & 1 deletion src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@ import {
} from '@aws-sdk/client-sqs';
import Debug from 'debug';

import { ConsumerOptions, TypedEventEmitter, StopOptions } from './types';
import {
ConsumerOptions,
TypedEventEmitter,
StopOptions,
UpdatableOptions
} from './types';
import { autoBind } from './bind';
import {
SQSError,
Expand Down Expand Up @@ -132,6 +137,47 @@ export class Consumer extends TypedEventEmitter {
return !this.stopped;
}

/**
* Updates visibilityTimeout to the provided value.
* @param value The value to set visibilityTimeout to
*/
private updateVisibilityTimeout(value: ConsumerOptions['visibilityTimeout']) {
if (typeof value !== 'number') {
throw new Error('visibilityTimeout must be a number');
}

if (
typeof value !== 'number' ||
(this.heartbeatInterval && value <= this.heartbeatInterval)
) {
throw new Error('heartbeatInterval must be less than visibilityTimeout.');
}

debug(`Updating the visibilityTimeout option to the value ${value}`);

this.visibilityTimeout = value;

this.emit('option_updated', 'visibilityTimeout', value);
}

/**
* Updates the provided option to the provided value.
* @param option The option that you want to update
* @param value The value to set the option to
*/
public updateOption(
option: UpdatableOptions,
value: ConsumerOptions[UpdatableOptions]
) {
switch (option) {
case 'visibilityTimeout':
this.updateVisibilityTimeout(value);
break;
default:
throw new Error(`The update ${option} cannot be updated`);
}
}

/**
* Emit one of the consumer's error events depending on the error received.
* @param err The error object to forward on
Expand Down
6 changes: 6 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ export interface ConsumerOptions {
handleMessageBatch?(messages: Message[]): Promise<Message[] | void>;
}

export type UpdatableOptions = 'visibilityTimeout';

export interface StopOptions {
/**
* Default to `false`, if you want the stop action to also abort requests to SQS
Expand Down Expand Up @@ -155,6 +157,10 @@ export interface Events {
* Fired when the consumer finally stops its work.
*/
stopped: [];
/**
* Fired when an option is updated
*/
option_updated: [UpdatableOptions, ConsumerOptions[UpdatableOptions]];
}

export class TypedEventEmitter extends EventEmitter {
Expand Down
81 changes: 76 additions & 5 deletions test/tests/consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ describe('Consumer', () => {
region: REGION,
queueUrl: QUEUE_URL
});
});
}, `Missing SQS consumer option [ handleMessage or handleMessageBatch ].`);
});

it('requires the batchSize option to be no greater than 10', () => {
Expand All @@ -112,7 +112,7 @@ describe('Consumer', () => {
handleMessage,
batchSize: 11
});
});
}, 'SQS batchSize option must be between 1 and 10.');
});

it('requires the batchSize option to be greater than 0', () => {
Expand All @@ -123,7 +123,7 @@ describe('Consumer', () => {
handleMessage,
batchSize: -1
});
});
}, 'SQS batchSize option must be between 1 and 10.');
});

it('requires visibilityTimeout to be set with heartbeatInterval', () => {
Expand All @@ -134,7 +134,7 @@ describe('Consumer', () => {
handleMessage,
heartbeatInterval: 30
});
});
}, 'heartbeatInterval must be less than visibilityTimeout.');
});

it('requires heartbeatInterval to be less than visibilityTimeout', () => {
Expand All @@ -146,7 +146,7 @@ describe('Consumer', () => {
heartbeatInterval: 30,
visibilityTimeout: 30
});
});
}, 'heartbeatInterval must be less than visibilityTimeout.');
});

describe('.create', () => {
Expand Down Expand Up @@ -1175,6 +1175,77 @@ describe('Consumer', () => {
});
});

describe('updateOption', async () => {
it('updates the visibilityTimeout option and emits an event', () => {
const optionUpdatedListener = sandbox.stub();
consumer.on('option_updated', optionUpdatedListener);

consumer.updateOption('visibilityTimeout', 45);

assert.equal(consumer.visibilityTimeout, 45);

sandbox.assert.calledWithMatch(
optionUpdatedListener,
'visibilityTimeout',
45
);
});

it('does not update the visibilityTimeout if the value is not a number', () => {
consumer = new Consumer({
region: REGION,
queueUrl: QUEUE_URL,
handleMessage,
visibilityTimeout: 60
});

const optionUpdatedListener = sandbox.stub();
consumer.on('option_updated', optionUpdatedListener);

assert.throws(() => {
consumer.updateOption('visibilityTimeout', 'value');
}, 'visibilityTimeout must be a number');

assert.equal(consumer.visibilityTimeout, 60);

sandbox.assert.notCalled(optionUpdatedListener);
});

it('does not update the visibilityTimeout if the value is less than the heartbeatInterval', () => {
consumer = new Consumer({
region: REGION,
queueUrl: QUEUE_URL,
handleMessage,
heartbeatInterval: 30,
visibilityTimeout: 60
});

const optionUpdatedListener = sandbox.stub();
consumer.on('option_updated', optionUpdatedListener);

assert.throws(() => {
consumer.updateOption('visibilityTimeout', 30);
}, 'heartbeatInterval must be less than visibilityTimeout.');

assert.equal(consumer.visibilityTimeout, 60);

sandbox.assert.notCalled(optionUpdatedListener);
});

it('throws an error for an unknown option', () => {
consumer = new Consumer({
region: REGION,
queueUrl: QUEUE_URL,
handleMessage,
visibilityTimeout: 60
});

assert.throws(() => {
consumer.updateOption('unknown', 'value');
}, `The update unknown cannot be updated`);
});
});

describe('delete messages property', () => {
beforeEach(() => {
consumer = new Consumer({
Expand Down

0 comments on commit f692bbd

Please sign in to comment.