Skip to content

Commit

Permalink
Merge pull request #87 from chradek/update-deps
Browse files Browse the repository at this point in the history
updates rhea-promise to 2.0.0 to match rhea dependency being updated to 2.x
  • Loading branch information
chradek committed Jun 3, 2021
2 parents 8873a97 + 36f2afe commit 93961ff
Show file tree
Hide file tree
Showing 9 changed files with 129 additions and 73 deletions.
58 changes: 47 additions & 11 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,90 +1,126 @@
### 2.0.0 - (2021-06-03)

- Updates rhea dependency to the 2.x major version, and the tslib dependency to the 2.x major version.
- Adds `CreateRequestResponseLinkOptions` as an exported interface.

#### Breaking changes

- rhea has 1 breaking change introduced in version 2.x: timestamps are not deserialized as Date objects instead of numbers.
- Updates `AwaitableSendOptions` to include the optional fields `tag` and `format` which were previously passed to `AwaitableSender.send()`. These fields are no longer positional arguments on `AwaitableSender.send()`.
- Adds `SenderSendOptions` to include the optional fields `tag` and `format` which were previously passed to `Sender.send()`. These fields are no longer positional arguments on `Sender.send()`.
- Removes `sendTimeoutInSeconds` from the `AwaitableSendOptions` that is passed to the `AwaitableSender` constructor. `timeoutInSeconds` on `AwaitableSenderOptions` can still be used to set the timeout for individual `AwaitableSender.send()` invocations.
- Renames the following TypeScript interfaces to better match the methods they apply to:
- SenderOptionsWithSession -> CreateSenderOptions
- AwaitableSenderOptionsWithSession -> CreateAwaitableSenderOptions
- ReceiverOptionsWithSession -> CreateReceiverOptions

### 1.2.1 - (2021-04-15)

- `createSession`, `createReceiver`, and `createSender` methods now only close underlying rhea analogue when cancelled if the resource has already been opened.

### 1.2.0 - 2021-03-25

- Exposes the `incoming` getter on the `Session` that lets accessing size and capacity of the incoming deliveries [#79](https://github.com/amqp/rhea-promise/pull/79).
- Updates the error message for the `AbortError` to be a standard message `The operation was aborted.`.

### 1.1.0 - 2021-02-08

- All async methods now take a signal that can be used to cancel the operation. Fixes [#48](https://github.com/amqp/rhea-promise/issues/48)
- Added a `timeoutInSeconds` parameter to the `send` method on the `AwaitableSender` that overrides the timeout value for the send operation set when creating the sender.
- When the `error` event is fired when closing the sender/receiver link, surface errors occurring on the sender/receiver context if none are found on the session context. Details can be found in [PR #55](https://github.com/amqp/rhea-promise/pull/55)
- Updated minimum version of `rhea` to `^1.0.24`. Details can be found in [PR 68](https://github.com/amqp/rhea-promise/pull/68)

### 1.0.0 - 2019-06-27

- Updated minimum version of `rhea` to `^1.0.8`.
- Added a read only property `id` to the `Session` object. The id property is created by concatenating session's local channel, remote channel and the connection id `"local-<number>_remote-<number>_<connection-id>"`, thus making it unique for that connection.
- Improved log statements by adding the session `id` and the sender, receiver `name` to help while debugging applications.
- Added `options` to `Link.close({closeSession: true | false})`, thus the user can specify whether the underlying session should be closed while closing the `Sender|Receiver`. Default is `true`.
- Improved `open` and `close` operations on `Connection`, `Session` and `Link` by creating timer in case the connection gets disconnected. Fixes [#41](https://github.com/amqp/rhea-promise/issues/41).
- The current `Sender` does not have a provision of **"awaiting"** on sending a message. The user needs to add handlers on the `Sender` for `accepted`, `rejected`, `released`, `modified` to ensure whether the message was successfully sent.
Now, we have added a new `AwaitableSender` which adds the handlers internally and provides an **awaitable** `send()` operation to the customer. Fixes [#45](https://github.com/amqp/rhea-promise/issues/45).
Now, we have added a new `AwaitableSender` which adds the handlers internally and provides an **awaitable** `send()` operation to the customer. Fixes [#45](https://github.com/amqp/rhea-promise/issues/45).
- Exporting new Errors:
- `InsufficientCreditError`: Defines the error that occurs when the Sender does not have enough credit.
- `SendOperationFailedError`: Defines the error that occurs when the Sender fails to send a message.
- `InsufficientCreditError`: Defines the error that occurs when the Sender does not have enough credit.
- `SendOperationFailedError`: Defines the error that occurs when the Sender fails to send a message.

### 0.2.0 - 2019-05-17

- Updated `OperationTimeoutError` to be a non-AMQP Error as pointed out in [#42](https://github.com/amqp/rhea-promise/issues/42). Fixed in [PR](https://github.com/amqp/rhea-promise/pull/43).

### 0.1.15 - 2019-04-10

- Export rhea types for `Typed`. [PR](https://github.com/amqp/rhea-promise/pull/36).
- Export rhea types for `WebSocketImpl` and `WebSocketInstance`. [PR](https://github.com/amqp/rhea-promise/pull/38).
- Export rhea types for `WebSocketImpl` and `WebSocketInstance`. [PR](https://github.com/amqp/rhea-promise/pull/38).
- When opening a connection fails with no error, use standard error message. [PR](https://github.com/amqp/rhea-promise/pull/27).

### 0.1.14 - 2019-03-19

- Allow websockets usage on a connection without creating a container first. [PR](https://github.com/amqp/rhea-promise/pull/32).
- New function `removeAllSessions()` on the connection to clear the internal map in rhea to ensure
sessions are not reconnected on the next `connection.open()` call. [PR](https://github.com/amqp/rhea-promise/pull/33).
- New function `removeAllSessions()` on the connection to clear the internal map in rhea to ensure
sessions are not reconnected on the next `connection.open()` call. [PR](https://github.com/amqp/rhea-promise/pull/33).
- Remove all event listeners on link and session objects when `close()` is called on them. [PR](https://github.com/amqp/rhea-promise/pull/34)

### 0.1.13 - 2018-12-11

- Throw `OperationTimeoutError` when a Promise to create/close an entity is rejected.

### 0.1.12 - 2018-11-16

- Fix a minor bug in receiver creation.

### 0.1.11 - 2018-11-15

- Added checks for some event handler methods to exist before logging information that uses node's
event handlers inbuilt functions.
event handlers inbuilt functions.
- Improved error checking while creating the receiver.

### 0.1.10 - 2018-11-01

- Provided an option to add an event handler for "settled" event on the Receiver.

### 0.1.9 - 2018-10-24

- With the usage of `importHelpers`, the tslib will be needed in package.json for installers using older versions of npm (or using yarn). [PR](https://github.com/amqp/rhea-promise/pull/16).

### 0.1.8 - 2018-10-22

- Allow setting drain property on the receiver [PR](https://github.com/amqp/rhea-promise/pull/14).

### 0.1.7 - 2018-10-19

- Fixed a bug while populating the connectionId [PR](https://github.com/amqp/rhea-promise/pull/11).

### 0.1.6 - 2018-09-28

- property `actionInitiated` is now of type `number` which is incremented when the `create`, `close`
action on an entity is under process and decremented when the action completes (succeeeded or failed).
action on an entity is under process and decremented when the action completes (succeeeded or failed).

### 0.1.5 - 2018-09-27

- Improved log statements for better debugging.
- Any type of `error` event will be emitted with a tick delay. This would give enough time for the
`create()` methods to resolve the promise.
`create()` methods to resolve the promise.
- Added a new `boolean` property `actionInitiated` which indicates whether the `create`, `close`
action on an entity is under process.
action on an entity is under process.

### 0.1.4 - 2018-09-25

- `options` is a required property of `Connection` and `Container`.

### 0.1.3 - 2018-09-25

- Transform relevant objects in rhea EventContext to rhea-promise objects.
- Ensure that `container.createConnection()` creates a connection on that container and not on
the default container.
the default container.

### 0.1.2 - 2018-09-20

- TS target to ES2015. This should help us support node.js version 6.x and above.

### 0.1.1 - 2018-09-20

- Update homepage, repository and bug urls in package.json

### 0.1.0 - 2018-09-20

- Initial version of rhea-promise.
3 changes: 1 addition & 2 deletions examples/awaitableSend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ async function main(): Promise<void> {
target: {
address: senderAddress
},
sendTimeoutInSeconds: 10
};

await connection.open();
Expand All @@ -51,7 +50,7 @@ async function main(): Promise<void> {
};
// Please, note that we are awaiting on sender.send() to complete.
// You will notice that `delivery.settled` will be `true`, irrespective of whether the promise resolves or rejects.
const delivery: Delivery = await sender.send(message);
const delivery: Delivery = await sender.send(message, {timeoutInSeconds: 10});
console.log(
"[%s] await sendMessage -> Delivery id: %d, settled: %s",
connection.id,
Expand Down
41 changes: 16 additions & 25 deletions lib/awaitableSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,6 @@ export declare interface AwaitableSender {
}

export interface AwaitableSenderOptions extends BaseSenderOptions {
/**
* The duration in which the promise to send the message should complete (resolve/reject).
* If it is not completed, then the Promise will be rejected after timeout occurs.
* Default: `20 seconds`.
*/
sendTimeoutInSeconds?: number;
}

export interface AwaitableSendOptions {
Expand All @@ -54,19 +48,23 @@ export interface AwaitableSendOptions {
* sent. It only stops listening for an acknowledgement from the remote endpoint.
*/
abortSignal?: AbortSignalLike;
/**
* The message format. Specify this if a message with custom format needs to be sent.
* `0` implies the standard AMQP 1.0 defined format. If no value is provided, then the
* given message is assumed to be of type Message interface and encoded appropriately.
*/
format?: number;
/**
* The message tag if any.
*/
tag?: Buffer | string;
}

/**
* Describes the sender where one can await on the message being sent.
* @class AwaitableSender
*/
export class AwaitableSender extends BaseSender {
/**
* The duration in which the promise to send the message should complete (resolve/reject).
* If it is not completed, then the Promise will be rejected after timeout occurs.
* Default: `20 seconds`.
*/
sendTimeoutInSeconds: number;
/**
* @property {Map<number, PromiseLike} deliveryDispositionMap Maintains a map of delivery of
* messages that are being sent. It acts as a store for correlating the dispositions received
Expand All @@ -76,7 +74,6 @@ export class AwaitableSender extends BaseSender {

constructor(session: Session, sender: RheaSender, options: AwaitableSenderOptions = {}) {
super(session, sender, options);
this.sendTimeoutInSeconds = options.sendTimeoutInSeconds || 20;
/**
* The handler that will be added on the Sender for `accepted` event. If the delivery id is
* present in the disposition map then it will clear the timer and resolve the promise with the
Expand Down Expand Up @@ -181,22 +178,18 @@ export class AwaitableSender extends BaseSender {
* @param {Message | Buffer} msg The message to be sent. For default AMQP format msg parameter
* should be of type Message interface. For a custom format, the msg parameter should be a Buffer
* and a valid value should be passed to the `format` argument.
* @param {Buffer | string} [tag] The message tag if any.
* @param {number} [format] The message format. Specify this if a message with custom format needs
* to be sent. `0` implies the standard AMQP 1.0 defined format. If no value is provided, then the
* given message is assumed to be of type Message interface and encoded appropriately.
* @param {AwaitableSendOptions} [options] Options to configure the timeout and cancellation for
* the send operation.
* @param {AwaitableSendOptions} [options] Options to configure the timeout, cancellation for
* the send operation and the tag and message format of the message.
* @returns {Promise<Delivery>} Promise<Delivery> The delivery information about the sent message.
*/
send(msg: Message | Buffer, tag?: Buffer | string, format?: number, options?: AwaitableSendOptions): Promise<Delivery> {
send(msg: Message | Buffer, options: AwaitableSendOptions = {}): Promise<Delivery> {
return new Promise<Delivery>((resolve, reject) => {
log.sender("[%s] Sender '%s' on amqp session '%s', credit: %d available: %d",
this.connection.id, this.name, this.session.id, this.credit,
this.session.outgoing.available());

const abortSignal = options && options.abortSignal;
const timeoutInSeconds = options && options.timeoutInSeconds;
const timeoutInSeconds = options.timeoutInSeconds || 20;

if (abortSignal && abortSignal.aborted) {
const err = createAbortError();
Expand All @@ -205,16 +198,14 @@ export class AwaitableSender extends BaseSender {
}

if (this.sendable()) {
let sendTimeoutInSeconds = this.sendTimeoutInSeconds;
if (typeof timeoutInSeconds === "number" && timeoutInSeconds > 0) sendTimeoutInSeconds = timeoutInSeconds;
const timer = setTimeout(() => {
this.deliveryDispositionMap.delete(delivery.id);
const message = `Sender '${this.name}' on amqp session ` +
`'${this.session.id}', with address '${this.address}' was not able to send the ` +
`message with delivery id ${delivery.id} right now, due to operation timeout.`;
log.error("[%s] %s", this.connection.id, message);
return reject(new OperationTimeoutError(message));
}, sendTimeoutInSeconds * 1000);
}, timeoutInSeconds * 1000);

const onAbort = () => {
if (this.deliveryDispositionMap.has(delivery.id)) {
Expand All @@ -236,7 +227,7 @@ export class AwaitableSender extends BaseSender {
if (abortSignal) { abortSignal.removeEventListener("abort", onAbort); }
};

const delivery = (this._link as RheaSender).send(msg, tag, format);
const delivery = (this._link as RheaSender).send(msg, options.tag, options.format);
this.deliveryDispositionMap.set(delivery.id, {
resolve: (delivery: any) => {
resolve(delivery);
Expand Down
48 changes: 35 additions & 13 deletions lib/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,48 @@ import { AwaitableSender, AwaitableSenderOptions } from "./awaitableSender";
* a session if it was already created.
* @interface SenderOptionsWithSession
*/
export interface SenderOptionsWithSession extends SenderOptions {
export interface CreateSenderOptions extends SenderOptions {
session?: Session;
/**
* A signal used to cancel the Connection.createSender() operation.
*/
abortSignal?: AbortSignalLike;
}

/**
* Describes the options that can be provided while creating an Async AMQP sender.
* One can also provide a session if it was already created.
* @interface AwaitableSenderOptionsWithSession
*/
export interface AwaitableSenderOptionsWithSession extends AwaitableSenderOptions {
export interface CreateAwaitableSenderOptions extends AwaitableSenderOptions {
session?: Session;
/**
* A signal used to cancel the Connection.createAwaitableSender() operation.
*/
abortSignal?: AbortSignalLike;
}

/**
* Describes the options that can be provided while creating an AMQP receiver. One can also provide
* a session if it was already created.
* @interface ReceiverOptionsWithSession
*/
export interface ReceiverOptionsWithSession extends ReceiverOptions {
export interface CreateReceiverOptions extends ReceiverOptions {
session?: Session;
/**
* A signal used to cancel the Connection.createReceiver() operation.
*/
abortSignal?: AbortSignalLike;
}

/**
* Describes the options that can be provided while creating an AMQP Request-Response link. One can also provide
* a session if it was already created.
*/
export interface CreateRequestResponseLinkOptions {
session?: Session;
/**
* A signal used to cancel the Connection.createRequestResponseLink() operation.
*/
abortSignal?: AbortSignalLike;
}

/**
Expand Down Expand Up @@ -632,10 +654,10 @@ export class Connection extends Entity {
* - **Resolves** the promise with the Sender object when rhea emits the "sender_open" event.
* - **Rejects** the promise with an AmqpError when rhea emits the "sender_close" event while
* trying to create an amqp session or with an AbortError if the operation was cancelled.
* @param {SenderOptionsWithSession} options Optional parameters to create a sender link.
* @param {CreateSenderOptions} options Optional parameters to create a sender link.
* @return {Promise<Sender>} Promise<Sender>.
*/
async createSender(options?: SenderOptionsWithSession & { abortSignal?: AbortSignalLike; }): Promise<Sender> {
async createSender(options?: CreateSenderOptions): Promise<Sender> {
if (options && options.session && options.session.createSender) {
return options.session.createSender(options);
}
Expand All @@ -655,7 +677,7 @@ export class Connection extends Entity {
*
* @return Promise<AwaitableSender>.
*/
async createAwaitableSender(options?: AwaitableSenderOptionsWithSession & { abortSignal?: AbortSignalLike; }): Promise<AwaitableSender> {
async createAwaitableSender(options?: CreateAwaitableSenderOptions): Promise<AwaitableSender> {
if (options && options.session && options.session.createAwaitableSender) {
return options.session.createAwaitableSender(options);
}
Expand All @@ -668,10 +690,10 @@ export class Connection extends Entity {
* - **Resolves** the promise with the Sender object when rhea emits the "receiver_open" event.
* - **Rejects** the promise with an AmqpError when rhea emits the "receiver_close" event while
* trying to create an amqp session or with an AbortError if the operation was cancelled.
* @param {ReceiverOptionsWithSession} options Optional parameters to create a receiver link.
* @param {CreateReceiverOptions} options Optional parameters to create a receiver link.
* @return {Promise<Receiver>} Promise<Receiver>.
*/
async createReceiver(options?: ReceiverOptionsWithSession & { abortSignal?: AbortSignalLike; }): Promise<Receiver> {
async createReceiver(options?: CreateReceiverOptions): Promise<Receiver> {
if (options && options.session && options.session.createReceiver) {
return options.session.createReceiver(options);
}
Expand All @@ -685,18 +707,18 @@ export class Connection extends Entity {
* style operations where one may want to send a request and await for response.
* @param {SenderOptions} senderOptions Parameters to create a sender.
* @param {ReceiverOptions} receiverOptions Parameters to create a receiver.
* @param {Session} [session] The optional session on which the sender and receiver links will be
* created.
* @param {CreateRequestResponseLinkOptions} [options] Optional parameters to control how sender and receiver link creation.
* @return {Promise<ReqResLink>} Promise<ReqResLink>
*/
async createRequestResponseLink(senderOptions: SenderOptions, receiverOptions: ReceiverOptions,
providedSession?: Session, abortSignal?: AbortSignalLike): Promise<ReqResLink> {
options: CreateRequestResponseLinkOptions = {}): Promise<ReqResLink> {
if (!senderOptions) {
throw new Error(`Please provide sender options.`);
}
if (!receiverOptions) {
throw new Error(`Please provide receiver options.`);
}
const { session: providedSession, abortSignal } = options;
const session = providedSession || await this.createSession({ abortSignal });
const [sender, receiver] = await Promise.all([
session.createSender({ ...senderOptions, abortSignal }),
Expand Down
Loading

0 comments on commit 93961ff

Please sign in to comment.