From 034eecc4aa2d7f506a847606d633127eae0f82d9 Mon Sep 17 00:00:00 2001 From: Mikel Blanchard Date: Tue, 16 Jan 2018 11:29:27 -0800 Subject: [PATCH] Support for building Message objects from ArraySegments. --- .../Amqp/AmqpMessageConverter.cs | 4 +- .../Extensions/MessageInterOpExtensions.cs | 8 +- src/Microsoft.Azure.ServiceBus/Message.cs | 157 ++++++++++++------ ...rovals.ApproveAzureServiceBus.approved.txt | 2 + .../MessageTests.cs | 32 ++++ 5 files changed, 149 insertions(+), 54 deletions(-) diff --git a/src/Microsoft.Azure.ServiceBus/Amqp/AmqpMessageConverter.cs b/src/Microsoft.Azure.ServiceBus/Amqp/AmqpMessageConverter.cs index 37a58104..03eb2c09 100644 --- a/src/Microsoft.Azure.ServiceBus/Amqp/AmqpMessageConverter.cs +++ b/src/Microsoft.Azure.ServiceBus/Amqp/AmqpMessageConverter.cs @@ -93,7 +93,9 @@ public static AmqpMessage BatchSBMessagesAsAmqpMessage(IEnumerable sb public static AmqpMessage SBMessageToAmqpMessage(SBMessage sbMessage) { - var amqpMessage = sbMessage.Body == null ? AmqpMessage.Create() : AmqpMessage.Create(new Data { Value = new ArraySegment(sbMessage.Body) }); + var messageData = sbMessage.BackingData; + + var amqpMessage = messageData.Count <= 0 ? AmqpMessage.Create() : AmqpMessage.Create(new Data { Value = messageData }); amqpMessage.Properties.MessageId = sbMessage.MessageId; amqpMessage.Properties.CorrelationId = sbMessage.CorrelationId; diff --git a/src/Microsoft.Azure.ServiceBus/Extensions/MessageInterOpExtensions.cs b/src/Microsoft.Azure.ServiceBus/Extensions/MessageInterOpExtensions.cs index d2f2de2d..00cf5bb2 100644 --- a/src/Microsoft.Azure.ServiceBus/Extensions/MessageInterOpExtensions.cs +++ b/src/Microsoft.Azure.ServiceBus/Extensions/MessageInterOpExtensions.cs @@ -85,7 +85,9 @@ public static T GetBody(this Message message, XmlObjectSerializer serializer return (T)message.SystemProperties.BodyObject; } - if(message.Body == null || message.Body.Length == 0) + var messageData = message.BackingData; + + if(messageData.Count == 0) { return default; } @@ -95,9 +97,9 @@ public static T GetBody(this Message message, XmlObjectSerializer serializer serializer = DataContractBinarySerializer.Instance; } - using (var memoryStream = new MemoryStream(message.Body.Length)) + using (var memoryStream = new MemoryStream(messageData.Count)) { - memoryStream.Write(message.Body, 0, message.Body.Length); + memoryStream.Write(messageData.Array, messageData.Offset, messageData.Count); memoryStream.Flush(); memoryStream.Position = 0; return (T)serializer.ReadObject(memoryStream); diff --git a/src/Microsoft.Azure.ServiceBus/Message.cs b/src/Microsoft.Azure.ServiceBus/Message.cs index 3a3152a6..e951767d 100644 --- a/src/Microsoft.Azure.ServiceBus/Message.cs +++ b/src/Microsoft.Azure.ServiceBus/Message.cs @@ -26,12 +26,16 @@ public class Message /// public static string DeadLetterErrorDescriptionHeader = "DeadLetterErrorDescription"; + private static ArraySegment EmptyData = new ArraySegment(); + private string messageId; private string sessionId; private string replyToSessionId; private string partitionKey; private string viaPartitionKey; private TimeSpan timeToLive; + private byte[] body; + private ArraySegment? backingData; /// /// Creates a new Message @@ -41,13 +45,25 @@ public Message() { } + /// + /// Creates a new Message from the specified buffer backing storage. + /// + /// The buffer containing the message payload. + /// The zero-based index of the first byte in the message. + /// The number of bytes in the message. + public Message(byte[] buffer, int offset, int count) + : this(null) + { + this.backingData = new ArraySegment(buffer, offset, count); + } + /// /// Creates a new message from the specified payload. /// /// The payload of the message in bytes public Message(byte[] body) { - this.Body = body; + this.body = body; this.SystemProperties = new SystemPropertiesCollection(); this.UserProperties = new Dictionary(); } @@ -60,18 +76,54 @@ public Message(byte[] body) /// /// message.Body = System.Text.Encoding.UTF8.GetBytes("Message1"); /// + /// Note: In the event that a buffer is used as backing storage for this Message, accessing Body + /// will cause a memory allocation and data copy from the backing storage, fixing the Body. Writing + /// to the Body when a backing store is used will discard the backing storage completely. /// - public byte[] Body { get; set; } + public byte[] Body + { + get + { + if (this.backingData.HasValue) + { + this.body = new byte[this.backingData.Value.Count]; + Array.Copy(this.backingData.Value.Array, this.backingData.Value.Offset, this.body, 0, this.backingData.Value.Count); + this.backingData = null; + } + return this.body; + } + set + { + this.body = value; + if (this.backingData.HasValue) + this.backingData = null; + } + } + + /// + /// Gets the data behind the message body. + /// + public ArraySegment BackingData + { + get + { + if (this.backingData.HasValue) + return this.backingData.Value; + if (this.body != null) + return new ArraySegment(this.body, 0, this.body.Length); + return EmptyData; + } + } /// /// Gets or sets the MessageId to identify the message. /// /// - /// The message identifier is an application-defined value that uniquely identifies the - /// message and its payload. The identifier is a free-form string and can reflect a GUID - /// or an identifier derived from the application context. If enabled, the - /// duplicate detection - /// feature identifies and removes second and further submissions of messages with the + /// The message identifier is an application-defined value that uniquely identifies the + /// message and its payload. The identifier is a free-form string and can reflect a GUID + /// or an identifier derived from the application context. If enabled, the + /// duplicate detection + /// feature identifies and removes second and further submissions of messages with the /// same MessageId. /// public string MessageId @@ -88,9 +140,9 @@ public string MessageId /// Gets or sets a partition key for sending a message to a partitioned entity. /// The partition key. Maximum length is 128 characters. /// - /// For partitioned entities, - /// setting this value enables assigning related messages to the same internal partition, so that submission sequence - /// order is correctly recorded. The partition is chosen by a hash function over this value and cannot be chosen + /// For partitioned entities, + /// setting this value enables assigning related messages to the same internal partition, so that submission sequence + /// order is correctly recorded. The partition is chosen by a hash function over this value and cannot be chosen /// directly. For session-aware entities, the property overrides this value. /// public string PartitionKey @@ -107,7 +159,7 @@ public string PartitionKey /// Gets or sets a partition key for sending a message into an entity via a partitioned transfer queue. /// The partition key. Maximum length is 128 characters. /// - /// If a message is sent via a transfer queue in the scope of a transaction, this value selects the + /// If a message is sent via a transfer queue in the scope of a transaction, this value selects the /// transfer queue partition: This is functionally equivalent to and ensures that /// messages are kept together and in order as they are transferred. /// See Transfers and Send Via. @@ -126,9 +178,9 @@ public string ViaPartitionKey /// Gets or sets the session identifier for a session-aware entity. /// The session identifier. Maximum length is 128 characters. /// - /// For session-aware entities, this application-defined value specifies the session - /// affiliation of the message. Messages with the same session identifier are subject - /// to summary locking and enable exact in-order processing and demultiplexing. + /// For session-aware entities, this application-defined value specifies the session + /// affiliation of the message. Messages with the same session identifier are subject + /// to summary locking and enable exact in-order processing and demultiplexing. /// For session-unaware entities, this value is ignored. /// See Message Sessions. /// @@ -146,7 +198,7 @@ public string SessionId /// Gets or sets a session identifier augmenting the address. /// Session identifier. Maximum length is 128 characters. /// - /// This value augments the ReplyTo information and specifies which SessionId should be set + /// This value augments the ReplyTo information and specifies which SessionId should be set /// for the reply when sent to the reply entity. See Message Routing and Correlation /// public string ReplyToSessionId @@ -164,8 +216,8 @@ public string ReplyToSessionId /// The message expiration time in UTC. This property is read-only. /// If the message has not been received. For example if a new message was created but not yet sent and received. /// - /// The UTC instant at which the message is marked for removal and no longer available for retrieval - /// from the entity due to expiration. Expiry is controlled by the property + /// The UTC instant at which the message is marked for removal and no longer available for retrieval + /// from the entity due to expiration. Expiry is controlled by the property /// and this property is computed from + public DateTime ExpiresAtUtc { @@ -181,15 +233,15 @@ public DateTime ExpiresAtUtc } /// - /// Gets or sets the message’s "time to live" value. + /// Gets or sets the message’s "time to live" value. /// /// The message’s time to live value. /// - /// This value is the relative duration after which the message expires, starting from the instant - /// the message has been accepted and stored by the broker, as captured in . - /// When not set explicitly, the assumed value is the DefaultTimeToLive for the respective queue or topic. - /// A message-level value cannot be longer than the entity's DefaultTimeToLive - /// setting and it is silently adjusted if it does. + /// This value is the relative duration after which the message expires, starting from the instant + /// the message has been accepted and stored by the broker, as captured in . + /// When not set explicitly, the assumed value is the DefaultTimeToLive for the respective queue or topic. + /// A message-level value cannot be longer than the entity's DefaultTimeToLive + /// setting and it is silently adjusted if it does. /// See Expiration /// public TimeSpan TimeToLive @@ -214,7 +266,7 @@ public TimeSpan TimeToLive /// Gets or sets the a correlation identifier. /// Correlation identifier. /// - /// Allows an application to specify a context for the message for the purposes of correlation, + /// Allows an application to specify a context for the message for the purposes of correlation, /// for example reflecting the MessageId of a message that is being replied to. /// See Message Routing and Correlation. /// @@ -223,7 +275,7 @@ public TimeSpan TimeToLive /// Gets or sets an application specific label. /// The application specific label /// - /// This property enables the application to indicate the purpose of the message to the receiver in a standardized + /// This property enables the application to indicate the purpose of the message to the receiver in a standardized /// fashion, similar to an email subject line. The mapped AMQP property is "subject". /// public string Label { get; set; } @@ -231,9 +283,9 @@ public TimeSpan TimeToLive /// Gets or sets the "to" address. /// The "to" address. /// - /// This property is reserved for future use in routing scenarios and presently ignored by the broker itself. - /// Applications can use this value in rule-driven - /// auto-forward chaining scenarios to indicate the + /// This property is reserved for future use in routing scenarios and presently ignored by the broker itself. + /// Applications can use this value in rule-driven + /// auto-forward chaining scenarios to indicate the /// intended logical destination of the message. /// public string To { get; set; } @@ -241,7 +293,7 @@ public TimeSpan TimeToLive /// Gets or sets the content tpye descriptor. /// RFC2045 Content-Type descriptor. /// - /// Optionally describes the payload of the message, with a descriptor following the format of + /// Optionally describes the payload of the message, with a descriptor following the format of /// RFC2045, Section 5, for example "application/json". /// public string ContentType { get; set; } @@ -249,8 +301,8 @@ public TimeSpan TimeToLive /// Gets or sets the address of an entity to send replies to. /// The reply entity address. /// - /// This optional and application-defined value is a standard way to express a reply path - /// to the receiver of the message. When a sender expects a reply, it sets the value to the + /// This optional and application-defined value is a standard way to express a reply path + /// to the receiver of the message. When a sender expects a reply, it sets the value to the /// absolute or relative path of the queue or topic it expects the reply to be sent to. /// See Message Routing and Correlation. /// @@ -268,7 +320,7 @@ public TimeSpan TimeToLive /// /// Gets the total size of the message body in bytes. /// - public long Size => Body.Length; + public long Size => this.backingData.HasValue ? this.backingData.Value.Count : this.body.Length; /// /// Gets the "user properties" bag, which can be used for custom message metadata. @@ -293,19 +345,24 @@ public override string ToString() return string.Format(CultureInfo.CurrentCulture, $"{{MessageId:{this.MessageId}}}"); } - /// Clones a message, so that it is possible to send a clone of an already received - /// message as a new message. The system properties of original message + /// Clones a message, so that it is possible to send a clone of an already received + /// message as a new message. The system properties of original message /// are not copied. /// A cloned . public Message Clone() { var clone = (Message)this.MemberwiseClone(); clone.SystemProperties = new SystemPropertiesCollection(); - - if (this.Body != null) + if (this.backingData.HasValue) + { + clone.body = new byte[this.backingData.Value.Count]; + Array.Copy(this.backingData.Value.Array, this.backingData.Value.Offset, clone.body, 0, this.backingData.Value.Count); + clone.backingData = null; + } + else if (this.body != null) { - var clonedBody = new byte[this.Body.Length]; - Array.Copy(this.Body, clonedBody, this.Body.Length); + var clonedBody = new byte[this.body.Length]; + Array.Copy(this.body, clonedBody, this.body.Length); clone.Body = clonedBody; } return clone; @@ -363,9 +420,9 @@ public sealed class SystemPropertiesCollection /// Gets the lock token for the current message. /// /// - /// The lock token is a reference to the lock that is being held by the broker in mode. + /// The lock token is a reference to the lock that is being held by the broker in mode. /// Locks are used to explicitly settle messages as explained in the product documentation in more detail. - /// The token can also be used to pin the lock permanently through the Deferral API and, with that, take the message out of the + /// The token can also be used to pin the lock permanently through the Deferral API and, with that, take the message out of the /// regular delivery state flow. This property is read-only. /// public string LockToken => this.LockTokenGuid.ToString(); @@ -378,7 +435,7 @@ public sealed class SystemPropertiesCollection /// /// This value starts at 1. /// - /// Number of deliveries that have been attempted for this message. The count is incremented when a message lock expires, + /// Number of deliveries that have been attempted for this message. The count is incremented when a message lock expires, /// or the message is explicitly abandoned by the receiver. This property is read-only. /// public int DeliveryCount @@ -395,8 +452,8 @@ public int DeliveryCount /// Gets the date and time in UTC until which the message will be locked in the queue/subscription. /// The date and time until which the message will be locked in the queue/subscription. /// - /// For messages retrieved under a lock (peek-lock receive mode, not pre-settled) this property reflects the UTC - /// instant until which the message is held locked in the queue/subscription. When the lock expires, the + /// For messages retrieved under a lock (peek-lock receive mode, not pre-settled) this property reflects the UTC + /// instant until which the message is held locked in the queue/subscription. When the lock expires, the /// is incremented and the message is again available for retrieval. This property is read-only. /// public DateTime LockedUntilUtc @@ -412,9 +469,9 @@ public DateTime LockedUntilUtc /// Gets the unique number assigned to a message by Service Bus. /// - /// The sequence number is a unique 64-bit integer assigned to a message as it is accepted - /// and stored by the broker and functions as its true identifier. For partitioned entities, - /// the topmost 16 bits reflect the partition identifier. Sequence numbers monotonically increase. + /// The sequence number is a unique 64-bit integer assigned to a message as it is accepted + /// and stored by the broker and functions as its true identifier. For partitioned entities, + /// the topmost 16 bits reflect the partition identifier. Sequence numbers monotonically increase. /// They roll over to 0 when the 48-64 bit range is exhausted. This property is read-only. /// public long SequenceNumber @@ -432,7 +489,7 @@ public long SequenceNumber /// Gets the name of the queue or subscription that this message was enqueued on, before it was deadlettered. /// /// - /// Only set in messages that have been dead-lettered and subsequently auto-forwarded from the dead-letter queue + /// Only set in messages that have been dead-lettered and subsequently auto-forwarded from the dead-letter queue /// to another entity. Indicates the entity in which the message was dead-lettered. This property is read-only. /// public string DeadLetterSource @@ -460,7 +517,7 @@ internal short PartitionId /// Gets or sets the original sequence number of the message. /// The enqueued sequence number of the message. /// - /// For messages that have been auto-forwarded, this property reflects the sequence number + /// For messages that have been auto-forwarded, this property reflects the sequence number /// that had first been assigned to the message at its original point of submission. This property is read-only. /// public long EnqueuedSequenceNumber @@ -477,8 +534,8 @@ public long EnqueuedSequenceNumber /// Gets or sets the date and time of the sent time in UTC. /// The enqueue time in UTC. /// - /// The UTC instant at which the message has been accepted and stored in the entity. - /// This value can be used as an authoritative and neutral arrival time indicator when + /// The UTC instant at which the message has been accepted and stored in the entity. + /// This value can be used as an authoritative and neutral arrival time indicator when /// the receiver does not want to trust the sender's clock. This property is read-only. /// public DateTime EnqueuedTimeUtc diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt b/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt index 0bb7c5f2..cd1412ab 100644 --- a/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt @@ -124,7 +124,9 @@ namespace Microsoft.Azure.ServiceBus public static string DeadLetterErrorDescriptionHeader; public static string DeadLetterReasonHeader; public Message() { } + public Message(byte[] buffer, int offset, int count) { } public Message(byte[] body) { } + public System.ArraySegment BackingData { get; } public byte[] Body { get; set; } public string ContentType { get; set; } public string CorrelationId { get; set; } diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/MessageTests.cs b/test/Microsoft.Azure.ServiceBus.UnitTests/MessageTests.cs index c6b5295d..47921fae 100644 --- a/test/Microsoft.Azure.ServiceBus.UnitTests/MessageTests.cs +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/MessageTests.cs @@ -58,6 +58,38 @@ void TestClone() Assert.Equal(replyToSessionId, clone.ReplyToSessionId); } + [Fact] + void TestBackingData() + { + var buffer = new byte[] { 0x00, 0x01, 0xFF }; + + var message = new Message(buffer, 1, 1); + + Assert.Equal(1, message.Size); + Assert.Equal(buffer, message.BackingData.Array); + + var body = message.Body; + + Assert.NotNull(body); + Assert.NotEqual(body, buffer); + Assert.Equal(1, message.Size); + Assert.Equal(1, body.Length); + Assert.Equal(body[0], buffer[1]); + } + + [Fact] + void TestBackingDataClone() + { + var buffer = new byte[] { 0x00, 0x01, 0xFF }; + + var message = new Message(buffer, 1, 1); + + var clone = message.Clone(); + + Assert.NotEqual(buffer, clone.BackingData.Array); + Assert.Equal(1, clone.BackingData.Array.Length); + } + public class WhenQueryingIsReceivedProperty { [Fact]