Skip to content

Commit

Permalink
Revert merge from master
Browse files Browse the repository at this point in the history
  • Loading branch information
ramya0820 committed Jul 16, 2019
1 parent c33a0fe commit 0eb5b15
Showing 1 changed file with 61 additions and 100 deletions.
161 changes: 61 additions & 100 deletions sdk/eventhub/event-hubs/src/eventHubSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import uuid from "uuid/v4";
import * as log from "./log";
import {
messageProperties,
Sender,
EventContext,
OnAmqpEvent,
Expand All @@ -29,7 +30,6 @@ import { ConnectionContext } from "./connectionContext";
import { LinkEntity } from "./linkEntity";
import { SendOptions, EventHubProducerOptions } from "./eventHubClient";
import { AbortSignalLike, AbortError } from "@azure/abort-controller";
import { EventDataBatch } from "./eventDataBatch";
import { getRetryAttemptTimeoutInMs } from "./eventHubClient";

/**
Expand Down Expand Up @@ -130,7 +130,7 @@ export class EventHubSender extends LinkEntity {
if (senderError) {
log.error(
"[%s] 'sender_close' event occurred for sender '%s' with address '%s'. " +
"The associated error is: %O",
"The associated error is: %O",
this._context.connectionId,
this.name,
this.address,
Expand All @@ -141,8 +141,8 @@ export class EventHubSender extends LinkEntity {
if (!this.isConnecting) {
log.error(
"[%s] 'sender_close' event occurred on the sender '%s' with address '%s' " +
"and the sdk did not initiate this. The sender is not reconnecting. Hence, calling " +
"detached from the _onAmqpClose() handler.",
"and the sdk did not initiate this. The sender is not reconnecting. Hence, calling " +
"detached from the _onAmqpClose() handler.",
this._context.connectionId,
this.name,
this.address
Expand All @@ -151,8 +151,8 @@ export class EventHubSender extends LinkEntity {
} else {
log.error(
"[%s] 'sender_close' event occurred on the sender '%s' with address '%s' " +
"and the sdk did not initate this. Moreover the sender is already re-connecting. " +
"Hence not calling detached from the _onAmqpClose() handler.",
"and the sdk did not initate this. Moreover the sender is already re-connecting. " +
"Hence not calling detached from the _onAmqpClose() handler.",
this._context.connectionId,
this.name,
this.address
Expand All @@ -161,8 +161,8 @@ export class EventHubSender extends LinkEntity {
} else {
log.error(
"[%s] 'sender_close' event occurred on the sender '%s' with address '%s' " +
"because the sdk initiated it. Hence not calling detached from the _onAmqpClose" +
"() handler.",
"because the sdk initiated it. Hence not calling detached from the _onAmqpClose" +
"() handler.",
this._context.connectionId,
this.name,
this.address
Expand All @@ -176,7 +176,7 @@ export class EventHubSender extends LinkEntity {
if (sessionError) {
log.error(
"[%s] 'session_close' event occurred for sender '%s' with address '%s'. " +
"The associated error is: %O",
"The associated error is: %O",
this._context.connectionId,
this.name,
this.address,
Expand All @@ -187,8 +187,8 @@ export class EventHubSender extends LinkEntity {
if (!this.isConnecting) {
log.error(
"[%s] 'session_close' event occurred on the session of sender '%s' with " +
"address '%s' and the sdk did not initiate this. Hence calling detached from the " +
"_onSessionClose() handler.",
"address '%s' and the sdk did not initiate this. Hence calling detached from the " +
"_onSessionClose() handler.",
this._context.connectionId,
this.name,
this.address
Expand All @@ -197,8 +197,8 @@ export class EventHubSender extends LinkEntity {
} else {
log.error(
"[%s] 'session_close' event occurred on the session of sender '%s' with " +
"address '%s' and the sdk did not initiate this. Moreover the sender is already " +
"re-connecting. Hence not calling detached from the _onSessionClose() handler.",
"address '%s' and the sdk did not initiate this. Moreover the sender is already " +
"re-connecting. Hence not calling detached from the _onSessionClose() handler.",
this._context.connectionId,
this.name,
this.address
Expand All @@ -207,8 +207,8 @@ export class EventHubSender extends LinkEntity {
} else {
log.error(
"[%s] 'session_close' event occurred on the session of sender '%s' with address " +
"'%s' because the sdk initiated it. Hence not calling detached from the _onSessionClose" +
"() handler.",
"'%s' because the sdk initiated it. Hence not calling detached from the _onSessionClose" +
"() handler.",
this._context.connectionId,
this.name,
this.address
Expand Down Expand Up @@ -237,17 +237,17 @@ export class EventHubSender extends LinkEntity {
shouldReopen = true;
log.error(
"[%s] close() method of Sender '%s' with address '%s' was not called. There " +
"was an accompanying error an it is retryable. This is a candidate for re-establishing " +
"the sender link.",
"was an accompanying error an it is retryable. This is a candidate for re-establishing " +
"the sender link.",
this._context.connectionId,
this.name,
this.address
);
} else {
log.error(
"[%s] close() method of Sender '%s' with address '%s' was not called. There " +
"was an accompanying error and it is NOT retryable. Hence NOT re-establishing " +
"the sender link.",
"was an accompanying error and it is NOT retryable. Hence NOT re-establishing " +
"the sender link.",
this._context.connectionId,
this.name,
this.address
Expand All @@ -257,8 +257,8 @@ export class EventHubSender extends LinkEntity {
shouldReopen = true;
log.error(
"[%s] close() method of Sender '%s' with address '%s' was not called. There " +
"was no accompanying error as well. This is a candidate for re-establishing " +
"the sender link.",
"was no accompanying error as well. This is a candidate for re-establishing " +
"the sender link.",
this._context.connectionId,
this.name,
this.address
Expand Down Expand Up @@ -298,7 +298,7 @@ export class EventHubSender extends LinkEntity {
} catch (err) {
log.error(
"[%s] An error occurred while processing onDetached() of Sender '%s' with address " +
"'%s': %O",
"'%s': %O",
this._context.connectionId,
this.name,
this.address,
Expand Down Expand Up @@ -341,34 +341,6 @@ export class EventHubSender extends LinkEntity {
);
return result;
}
/**
* Returns maximum message size on the AMQP sender link.
* @ignore
* @returns Promise<number>
*/
async getMaxMessageSize(): Promise<number> {
try {
if (!this.isOpen()) {
log.sender(
"Acquiring lock %s for initializing the session, sender and " +
"possibly the connection.",
this.senderLock
);
await defaultLock.acquire(this.senderLock, () => {
return this._init();
});
}
return this._sender!.maxMessageSize;
} catch (err) {
log.error(
"[%s] An error occurred while creating the sender %s",
this._context.connectionId,
this.name,
err
);
throw err;
}
}

/**
* Send a batch of EventData to the EventHub. The "message_annotations",
Expand All @@ -379,10 +351,7 @@ export class EventHubSender extends LinkEntity {
* @param options Options to control the way the events are batched along with request options
* @return Promise<void>
*/
async send(
events: EventData[] | EventDataBatch,
options?: SendOptions & EventHubProducerOptions
): Promise<void> {
async send(events: EventData[], options?: SendOptions & EventHubProducerOptions): Promise<void> {
try {
// throw an error if partition key and partition id are both defined
if (
Expand All @@ -401,56 +370,46 @@ export class EventHubSender extends LinkEntity {
throw error;
}

if (events instanceof EventDataBatch && options && options.partitionKey) {
// throw an error if partition key is different than the one provided in the options.
const error = new Error("Partition key is not supported when sending a batch message. Pass the partition key when creating the batch message instead.");
log.error(
"[%s] Partition key is not supported when using createBatch(). %O",
this._context.connectionId,
error
);
throw error;
}

log.sender(
"[%s] Sender '%s', trying to send EventData[].",
this._context.connectionId,
this.name
);

let encodedBatchMessage: Buffer | undefined;
if (events instanceof EventDataBatch) {
encodedBatchMessage = events.batchMessage!;
} else {
const partitionKey = (options && options.partitionKey) || undefined;
const messages: AmqpMessage[] = [];
// Convert EventData to AmqpMessage.
for (let i = 0; i < events.length; i++) {
const message = toAmqpMessage(events[i], partitionKey);
message.body = this._context.dataTransformer.encode(events[i].body);
messages[i] = message;
}
// Encode every amqp message and then convert every encoded message to amqp data section
const batchMessage: AmqpMessage = {
body: message.data_sections(messages.map(message.encode))
};

// Set message_annotations of the first message as
// that of the envelope (batch message).
if (messages[0].message_annotations) {
batchMessage.message_annotations = messages[0].message_annotations;
const partitionKey = (options && options.partitionKey) || undefined;
const messages: AmqpMessage[] = [];
// Convert EventData to AmqpMessage.
for (let i = 0; i < events.length; i++) {
const message = toAmqpMessage(events[i], partitionKey);
message.body = this._context.dataTransformer.encode(events[i].body);
messages[i] = message;
}
// Encode every amqp message and then convert every encoded message to amqp data section
const batchMessage: AmqpMessage = {
body: message.data_sections(messages.map(message.encode))
};
// Set message_annotations, application_properties and properties of the first message as
// that of the envelope (batch message).
if (messages[0].message_annotations) {
batchMessage.message_annotations = messages[0].message_annotations;
}
if (messages[0].application_properties) {
batchMessage.application_properties = messages[0].application_properties;
}
for (const prop of messageProperties) {
if ((messages[0] as any)[prop]) {
(batchMessage as any)[prop] = (messages[0] as any)[prop];
}

// Finally encode the envelope (batch message).
encodedBatchMessage = message.encode(batchMessage);
}

// Finally encode the envelope (batch message).
const encodedBatchMessage = message.encode(batchMessage);
log.sender(
"[%s] Sender '%s', sending encoded batch message.",
this._context.connectionId,
this.name,
encodedBatchMessage
);
return await this._trySendBatch(encodedBatchMessage, options);
return await this._trySendBatch(encodedBatchMessage, batchMessage.message_id, options);
} catch (err) {
log.error("An error occurred while sending the batch message %O", err);
throw err;
Expand Down Expand Up @@ -496,7 +455,9 @@ export class EventHubSender extends LinkEntity {
*/
private _trySendBatch(
message: AmqpMessage | Buffer,
options: SendOptions & EventHubProducerOptions = {}
tag: any,
options: SendOptions & EventHubProducerOptions = {},
format?: number
): Promise<void> {
const abortSignal: AbortSignalLike | undefined = options.abortSignal;

Expand All @@ -513,7 +474,7 @@ export class EventHubSender extends LinkEntity {
const rejectOnAbort = () => {
const desc: string =
`[${this._context.connectionId}] The send operation on the Sender "${
this.name
this.name
}" with ` + `address "${this.address}" has been cancelled by the user.`;
log.error(desc);
reject(new AbortError("The send operation has been cancelled by the user."));
Expand Down Expand Up @@ -553,7 +514,7 @@ export class EventHubSender extends LinkEntity {
} else {
err = new Error(
`[${this._context.connectionId}] Sender '${this.name}', ` +
`received a release disposition.Hence we are rejecting the promise.`
`received a release disposition.Hence we are rejecting the promise.`
);
}
log.error(err);
Expand All @@ -568,7 +529,7 @@ export class EventHubSender extends LinkEntity {
} else {
err = new Error(
`[${this._context.connectionId}] Sender "${this.name}", ` +
`received a modified disposition.Hence we are rejecting the promise.`
`received a modified disposition.Hence we are rejecting the promise.`
);
}
log.error(err);
Expand Down Expand Up @@ -675,7 +636,7 @@ export class EventHubSender extends LinkEntity {
if (!this.isOpen()) {
log.sender(
"Acquiring lock %s for initializing the session, sender and " +
"possibly the connection.",
"possibly the connection.",
this.senderLock
);
defaultLock
Expand All @@ -701,8 +662,8 @@ export class EventHubSender extends LinkEntity {
const maxRetries = options.retryOptions && options.retryOptions.maxRetries;
const delayInSeconds =
options.retryOptions &&
options.retryOptions.retryInterval &&
options.retryOptions.retryInterval >= 0
options.retryOptions.retryInterval &&
options.retryOptions.retryInterval >= 0
? options.retryOptions.retryInterval / 1000
: Constants.defaultDelayBetweenOperationRetriesInSeconds;
const config: RetryConfig<void> = {
Expand Down Expand Up @@ -730,7 +691,7 @@ export class EventHubSender extends LinkEntity {
if (!this.isOpen() && !this.isConnecting) {
log.error(
"[%s] The sender '%s' with address '%s' is not open and is not currently " +
"establishing itself. Hence let's try to connect.",
"establishing itself. Hence let's try to connect.",
this._context.connectionId,
this.name,
this.address
Expand Down Expand Up @@ -768,7 +729,7 @@ export class EventHubSender extends LinkEntity {
} else {
log.error(
"[%s] The sender '%s' with address '%s' is open -> %s and is connecting " +
"-> %s. Hence not reconnecting.",
"-> %s. Hence not reconnecting.",
this._context.connectionId,
this.name,
this.address,
Expand Down

0 comments on commit 0eb5b15

Please sign in to comment.