Skip to content

Commit

Permalink
Merge pull request Azure#94 from amarzavery/recovery
Browse files Browse the repository at this point in the history
multiple fixes
  • Loading branch information
amarzavery authored Jul 12, 2018
2 parents 12c35b4 + afc3e26 commit e04dc39
Show file tree
Hide file tree
Showing 13 changed files with 117 additions and 120 deletions.
3 changes: 2 additions & 1 deletion client/lib/amqp-common/auth/aad.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ export class AadTokenProvider implements TokenProvider {
if (result.expiresOn && result.expiresOn instanceof Date) {
expiresOn = result.expiresOn.getTime();
}
const expiry = Math.floor(expiresOn / 1000) + self.tokenValidTimeInSeconds - 5;
const expiry = Math.floor(expiresOn / 1000) +
self.tokenValidTimeInSeconds - Constants.aadTokenValidityMarginSeconds;
const tokenObj: TokenInfo = {
expiry: expiry,
tokenType: TokenType.CbsTokenTypeJwt,
Expand Down
38 changes: 29 additions & 9 deletions client/lib/amqp-common/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,16 @@ export enum ConditionErrorNameMapper {
/**
* Error is thrown when an incorrect argument was received.
*/
"com.microsoft:argument-error" = "ArgumentError"
"com.microsoft:argument-error" = "ArgumentError",
/**
* Error is thrown when server cancels the operation due to an internal issue.
*/
"com.microsoft:operation-cancelled" = "OperationCancelledError", // Retryable
/**
* Error is thrown when the client sender does not have enough link credits to send the message.
*/
"client.sender:not-enough-link-credit" = "SenderBusyError" // Retryable

}

/**
Expand Down Expand Up @@ -302,7 +311,7 @@ export enum ErrorNameConditionMapper {
/**
* Error is thrown when input was received for a link that was detached with an error.
*/
ErrantLinkError = "amqp:session:errant-link", // Retryable
ErrantLinkError = "amqp:session:errant-link",
/**
* Error is thrown when an attach was received using a handle that is already in use for an attached link.
*/
Expand Down Expand Up @@ -331,7 +340,15 @@ export enum ErrorNameConditionMapper {
/**
* Error is thrown when an incorrect argument was received.
*/
ArgumentError = "com.microsoft:argument-error"
ArgumentError = "com.microsoft:argument-error",
/**
* Error is thrown when server cancels the operation due to an internal issue.
*/
OperationCancelledError = "com.microsoft:operation-cancelled", // Retryable
/**
* Error is thrown when the client sender does not have enough link credits to send the message.
*/
SenderBusyError = "client.sender:not-enough-link-credit" // Retryable
}

/**
Expand All @@ -354,9 +371,9 @@ export class MessagingError extends Error {
translated: boolean = true;
/**
*
* @property {boolean} retryable Describes whether the error is retryable. Default: false.
* @property {boolean} retryable Describes whether the error is retryable. Default: true.
*/
retryable: boolean = false;
retryable: boolean = true;
/**
* @property {any} [info] Any additional error information given by the service.
*/
Expand All @@ -369,6 +386,11 @@ export class MessagingError extends Error {
}
}

export const retryableErrors: string[] = [
"InternalServerError", "ServerBusyError", "ServiceUnavailableError", "OperationCancelledError",
"SenderBusyError", "MessagingError"
];

/**
* Translates the AQMP error received at the protocol layer or a generic Error into an MessagingError.
*
Expand All @@ -393,10 +415,8 @@ export function translate(err: AmqpError | Error): MessagingError {
description.match(/The messaging entity .* could not be found.*/i) !== null)) {
error.name = "MessagingEntityNotFoundError";
}
if (error.name === "InternalServerError"
|| error.name === "ServerBusyError"
|| error.name === "ServiceUnavailableError") {
error.retryable = true;
if (retryableErrors.indexOf(error.name) === -1) { // not found
error.retryable = false;
}
return error;
} else {
Expand Down
2 changes: 1 addition & 1 deletion client/lib/amqp-common/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ export { Dictionary, Message as AmqpMessage } from "../rhea-promise";
export { ConnectionConfig } from "./connectionConfig";
export {
MessagingError, ErrorNameConditionMapper, ConditionStatusMapper, ConditionErrorNameMapper,
translate
translate, retryableErrors
} from "./errors";
export { RequestResponseLink } from "./requestResponseLink";
export { retry } from "./retry";
Expand Down
1 change: 1 addition & 0 deletions client/lib/amqp-common/util/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,4 @@ export const maxDurationValue = 922337203685477;
export const minDurationValue = -922337203685477;
// https://github.com/Azure/azure-amqp/blob/master/Microsoft.Azure.Amqp/Amqp/AmqpConstants.cs#L47
export const maxAbsoluteExpiryTime = new Date("9999-12-31T07:59:59.000Z").getTime();
export const aadTokenValidityMarginSeconds = 5;
53 changes: 14 additions & 39 deletions client/lib/eventHubClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,9 @@ export class EventHubClient {
/**
* Closes the AMQP connection to the Event Hub for this client,
* returning a promise that will be resolved when disconnection is completed.
* @returns {Promise<any>}
* @returns {Promise<void>} Promise<void>
*/
async close(): Promise<any> {
async close(): Promise<void> {
try {
if (this._context.connection.isOpen()) {
// Close all the senders.
Expand Down Expand Up @@ -149,7 +149,7 @@ export class EventHubClient {
* if you intend to send the event to a specific partition. When not specified EventHub will store the messages in a round-robin
* fashion amongst the different partitions in the EventHub.
*
* @returns {Promise<Delivery>} Promise<rheaPromise.Delivery>
* @returns {Promise<Delivery>} Promise<Delivery>
*/
async send(data: EventData, partitionId?: string | number): Promise<Delivery> {
const sender = EventHubSender.create(this._context, partitionId);
Expand All @@ -165,7 +165,7 @@ export class EventHubClient {
* if you intend to send the event to a specific partition. When not specified EventHub will store the messages in a round-robin
* fashion amongst the different partitions in the EventHub.
*
* @return {Promise<rheaPromise.Delivery>} Promise<rheaPromise.Delivery>
* @return {Promise<Delivery>} Promise<Delivery>
*/
async sendBatch(datas: EventData[], partitionId?: string | number): Promise<Delivery> {
const sender = EventHubSender.create(this._context, partitionId);
Expand All @@ -180,19 +180,7 @@ export class EventHubClient {
* @param {OnMessage} onMessage The message handler to receive event data objects.
* @param {OnError} onError The error handler to receive an error that occurs
* while receiving messages.
* @param {ReceiveOptions} [options] Options for how you'd like to connect.
* @param {string} [options.name] The name of the receiver. If not provided
* then we will set a GUID by default.
* @param {string} [options.consumerGroup] Consumer group from which to receive.
* @param {number} [options.prefetchCount] The upper limit of events this receiver will
* actively receive regardless of whether a receive operation is pending.
* @param {boolean} [options.enableReceiverRuntimeMetric] Provides the approximate receiver runtime information
* for a logical partition of an Event Hub if the value is true. Default false.
* @param {number} [options.epoch] The epoch value that this receiver is currently
* using for partition ownership. A value of undefined means this receiver is not an epoch-based receiver.
* @param {EventPosition} [options.eventPosition] The position of EventData in the EventHub parition from
* where the receiver should start receiving. Only one of offset, sequenceNumber, enqueuedTime, customFilter can be specified.
* `EventPosition.withCustomFilter()` should be used if you want more fine-grained control of the filtering.
* @param {ReceiveOptions} [options] Options for how you'd like to receive messages.
*
* @returns {ReceiveHandler} ReceiveHandler - An object that provides a mechanism to stop receiving more messages.
*/
Expand All @@ -214,21 +202,9 @@ export class EventHubClient {
* @param {number} maxMessageCount The maximum message count. Must be a value greater than 0.
* @param {number} [maxWaitTimeInSeconds] The maximum wait time in seconds for which the Receiver should wait
* to receiver the said amount of messages. If not provided, it defaults to 60 seconds.
* @param {ReceiveOptions} [options] Options for how you'd like to connect.
* @param {string} [options.name] The name of the receiver. If not provided
* then we will set a GUID by default.
* @param {string} [options.consumerGroup] Consumer group from which to receive.
* @param {number} [options.prefetchCount] The upper limit of events this receiver will
* actively receive regardless of whether a receive operation is pending.
* @param {boolean} [options.enableReceiverRuntimeMetric] Provides the approximate receiver runtime information
* for a logical partition of an Event Hub if the value is true. Default false.
* @param {number} [options.epoch] The epoch value that this receiver is currently
* using for partition ownership. A value of undefined means this receiver is not an epoch-based receiver.
* @param {EventPosition} [options.eventPosition] The position of EventData in the EventHub parition from
* where the receiver should start receiving. Only one of offset, sequenceNumber, enqueuedTime, customFilter can be specified.
* `EventPosition.withCustomFilter()` should be used if you want more fine-grained control of the filtering.
* @param {ReceiveOptions} [options] Options for how you'd like to receive messages.
*
* @returns {Array<EventData>} A promise that resolves with an array of EventData objects.
* @returns {Promise<Array<EventData>>} Promise<Array<EventData>>.
*/
async receiveBatch(partitionId: string | number, maxMessageCount: number, maxWaitTimeInSeconds?: number, options?: ReceiveOptions): Promise<EventData[]> {
if (!partitionId || (partitionId && typeof partitionId !== "string" && typeof partitionId !== "number")) {
Expand Down Expand Up @@ -323,7 +299,7 @@ export class EventHubClient {
* Creates an EventHub Client from connection string.
* @param {string} iothubConnectionString - Connection string of the form 'HostName=iot-host-name;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key'
* @param {ClientOptions} [options] Options that can be provided during client creation.
* @returns {Promise<EventHubClient>} - A promise that resolves with EventHubClient.
* @returns {Promise<EventHubClient>} - Promise<EventHubClient>.
*/
static async createFromIotHubConnectionString(iothubConnectionString: string, options?: ClientOptions): Promise<EventHubClient> {
if (!iothubConnectionString || (iothubConnectionString && typeof iothubConnectionString !== "string")) {
Expand Down Expand Up @@ -354,16 +330,15 @@ export class EventHubClient {
throw new Error("'entityPath' is a required parameter and must be of type: 'string'.");
}

if (!credentials ||
!(credentials instanceof ApplicationTokenCredentials ||
credentials instanceof UserTokenCredentials ||
credentials instanceof DeviceTokenCredentials ||
credentials instanceof MSITokenCredentials)) {
throw new Error("'credentials' is a required parameter and must be an instance of ApplicationTokenCredentials | UserTokenCredentials | DeviceTokenCredentials | MSITokenCredentials.");
if (!credentials) {
throw new Error("'credentials' is a required parameter and must be an instance of " +
"ApplicationTokenCredentials | UserTokenCredentials | DeviceTokenCredentials | " +
"MSITokenCredentials.");
}

if (!host.endsWith("/")) host += "/";
const connectionString = `Endpoint=sb://${host};SharedAccessKeyName=defaultKeyName;SharedAccessKey=defaultKeyValue`;
const connectionString = `Endpoint=sb://${host};SharedAccessKeyName=defaultKeyName;` +
`SharedAccessKey=defaultKeyValue`;
if (!options) options = {};
const clientOptions: ClientOptions = options;
clientOptions.tokenProvider = new AadTokenProvider(credentials);
Expand Down
15 changes: 10 additions & 5 deletions client/lib/eventHubSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
} from "./rhea-promise";
import { EventData } from "./eventData";
import { ConnectionContext } from "./connectionContext";
import { defaultLock, Func, retry, translate, AmqpMessage } from "./amqp-common";
import { defaultLock, Func, retry, translate, AmqpMessage, ErrorNameConditionMapper } from "./amqp-common";
import { LinkEntity } from "./linkEntity";

const debug = debugModule("azure:event-hubs:sender");
Expand Down Expand Up @@ -294,7 +294,7 @@ export class EventHubSender extends LinkEntity {
if (context!.delivery!.remote_state!.error) {
err = translate(context!.delivery!.remote_state!.error);
} else {
err = new Error(`[${this._context.connectionId}]Sender '${this.name}', ` +
err = new Error(`[${this._context.connectionId}] Sender '${this.name}', ` +
`received a release disposition.Hence we are rejecting the promise.`);
}
reject(err);
Expand All @@ -306,7 +306,7 @@ export class EventHubSender extends LinkEntity {
if (context!.delivery!.remote_state!.error) {
err = translate(context!.delivery!.remote_state!.error);
} else {
err = new Error(`[${this._context.connectionId}]Sender "${this.name}", ` +
err = new Error(`[${this._context.connectionId}] Sender "${this.name}", ` +
`received a modified disposition.Hence we are rejecting the promise.`);
}
reject(err);
Expand All @@ -319,14 +319,19 @@ export class EventHubSender extends LinkEntity {
debug("[%s] Sender '%s', sent message with delivery id: %d and tag: %s",
this._context.connectionId, this.name, delivery.id, delivery.tag.toString());
} else {
// let us retry to send the message after some time.
const msg = `[${this._context.connectionId}] Sender "${this.name}", ` +
`cannot send the message right now. Please try later.`;
debug(msg);
reject(new Error(msg));
const amqpError: AmqpError = {
condition: ErrorNameConditionMapper.SenderBusyError,
description: msg
};
reject(translate(amqpError));
}
});

return retry<Delivery>(sendEventPromise);
return retry<Delivery>(sendEventPromise, 3, 5);
}

/**
Expand Down
35 changes: 18 additions & 17 deletions client/lib/linkEntity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,17 +94,6 @@ export class LinkEntity {
this.name = options.name || uuid();
this.partitionId = options.partitionId;
}
/**
* Provides the current type of the LinkEntity.
* @return {string} The entity type.
*/
get type(): string {
let result = "LinkEntity";
if ((this as any).constructor && (this as any).constructor.name) {
result = (this as any).constructor.name;
}
return result;
}

/**
* Negotiates cbs claim for the LinkEntity.
Expand All @@ -119,20 +108,20 @@ export class LinkEntity {
// cbs session, since we want to have exactly 1 cbs session per connection).
debug("[%s] Acquiring cbs lock: '%s' for creating the cbs session while creating the %s: " +
"'%s' with address: '%s'.", this._context.connectionId, this._context.cbsSession.cbsLock,
this.type, this.name, this.address);
this._type, this.name, this.address);
await defaultLock.acquire(this._context.cbsSession.cbsLock,
() => { return this._context.cbsSession.init(); });
const tokenObject = await this._context.tokenProvider.getToken(this.audience);
debug("[%s] %s: calling negotiateClaim for audience '%s'.",
this._context.connectionId, this.type, this.audience);
this._context.connectionId, this._type, this.audience);
// Acquire the lock to negotiate the CBS claim.
debug("[%s] Acquiring cbs lock: '%s' for cbs auth for %s: '%s' with address '%s'.",
this._context.connectionId, this._context.negotiateClaimLock, this.type, this.name, this.address);
this._context.connectionId, this._context.negotiateClaimLock, this._type, this.name, this.address);
await defaultLock.acquire(this._context.negotiateClaimLock, () => {
return this._context.cbsSession.negotiateClaim(this.audience, tokenObject);
});
debug("[%s] Negotiated claim for %s '%s' with with address: %s",
this._context.connectionId, this.type, this.name, this.address);
this._context.connectionId, this._type, this.name, this.address);
if (setTokenRenewal) {
await this._ensureTokenRenewal();
}
Expand All @@ -152,11 +141,23 @@ export class LinkEntity {
await this._negotiateClaim(true);
} catch (err) {
debug("[%s] %s '%s' with address %s, an error occurred while renewing the token: %O",
this._context.connectionId, this.type, this.name, this.address, err);
this._context.connectionId, this._type, this.name, this.address, err);
}
}, nextRenewalTimeout);
debug("[%s] %s '%s' with address %s, has next token renewal in %d seconds @(%s).",
this._context.connectionId, this.type, this.name, this.address, nextRenewalTimeout / 1000,
this._context.connectionId, this._type, this.name, this.address, nextRenewalTimeout / 1000,
new Date(Date.now() + nextRenewalTimeout).toString());
}

/**
* Provides the current type of the LinkEntity.
* @return {string} The entity type.
*/
private get _type(): string {
let result = "LinkEntity";
if ((this as any).constructor && (this as any).constructor.name) {
result = (this as any).constructor.name;
}
return result;
}
}
2 changes: 0 additions & 2 deletions client/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
"allowJs": false,
"noUnusedLocals":true,
"strict": true,
"strictPropertyInitialization": true,
"strictNullChecks": true,
"declaration": true,
"declarationDir": "./typings",
},
Expand Down
15 changes: 1 addition & 14 deletions processor/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,16 @@
"preserveConstEnums": true,
"sourceMap": true,
"newLine": "LF",
"target": "es6",
"target": "es2017",
"moduleResolution": "node",
"noImplicitReturns": true,
"importHelpers": true,
"outDir": "dist/lib",
"allowJs": false,
"noUnusedLocals":true,
"strict": true,
"strictPropertyInitialization": true,
"strictNullChecks": true,
"declaration": true,
"declarationDir": "./typings/lib",
"lib": [
"es2015",
"dom",
"dom.iterable",
"es5",
"es6",
"es7",
"esnext",
"esnext.asynciterable",
"es2015.iterable"
]
},
"compileOnSave": true,
"exclude": [
Expand Down
8 changes: 0 additions & 8 deletions processor/tslint.json
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,5 @@
"check-seperator",
"check-type"
]
},
"cliOptions": {
"exclude": [
"examples",
"tests",
"testhub/*.ts",
"client"
]
}
}
Loading

0 comments on commit e04dc39

Please sign in to comment.