Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adding the ability to update visibilityTimeout #379

Merged
merged 7 commits into from
Apr 10, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@ By default, the value of `abort` is set to `false` which means pre existing requ

Returns the current polling state of the consumer: `true` if it is actively polling, `false` if it is not.

### `consumer.updateOption(option, value)`

Updates the provided option with the provided value.

You can [find out more about this here](https://bbc.github.io/sqs-consumer/public/classes/Consumer.html#updateOption).

### Events

Each consumer is an [`EventEmitter`](https://nodejs.org/api/events.html) and [emits these events](https://bbc.github.io/sqs-consumer/interfaces/Events.html).
Expand Down
32 changes: 31 additions & 1 deletion src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@ import {
} from '@aws-sdk/client-sqs';
import Debug from 'debug';

import { ConsumerOptions, TypedEventEmitter, StopOptions } from './types';
import {
ConsumerOptions,
TypedEventEmitter,
StopOptions,
UpdatableOptions
} from './types';
import { autoBind } from './bind';
import {
SQSError,
Expand Down Expand Up @@ -134,6 +139,31 @@ export class Consumer extends TypedEventEmitter {
return !this.stopped;
}

/**
* Updates the provided option to the provided value.
* @param option The option that you want to update
* @param value The value to set the option to
*/
public updateOption(
option: UpdatableOptions,
value: ConsumerOptions[UpdatableOptions]
) {
switch (option) {
case 'visibilityTimeout':
if (typeof value === 'number') {
debug(`Updating the visibilityTimeout option to the value ${value}`);

this.visibilityTimeout = value;

this.emit('option_updated', option, value);
}

break;
default:
break;
}
}

/**
* Emit one of the consumer's error events depending on the error received.
* @param err The error object to forward on
Expand Down
6 changes: 6 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ export interface ConsumerOptions {
handleMessageBatch?(messages: Message[]): Promise<Message[] | void>;
}

export type UpdatableOptions = 'visibilityTimeout';

export interface StopOptions {
/**
* Default to `false`, if you want the stop action to also abort requests to SQS
Expand Down Expand Up @@ -155,6 +157,10 @@ export interface Events {
* Fired when the consumer finally stops its work.
*/
stopped: [];
/**
* Fired when an option is updated
*/
option_updated: [UpdatableOptions, ConsumerOptions[UpdatableOptions]];
}

export class TypedEventEmitter extends EventEmitter {
Expand Down
17 changes: 17 additions & 0 deletions test/tests/consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1175,6 +1175,23 @@ describe('Consumer', () => {
});
});

describe('updateOption', async () => {
it('updates the visibilityTimeout option and emits an event', async () => {
const optionUpdatedListener = sandbox.stub();
consumer.on('option_updated', optionUpdatedListener);

consumer.updateOption('visibilityTimeout', 45);

assert.equal(consumer.visibilityTimeout, 45);

sandbox.assert.calledWithMatch(
optionUpdatedListener,
'visibilityTimeout',
45
);
});
});

describe('delete messages property', () => {
beforeEach(() => {
consumer = new Consumer({
Expand Down