From 525c2aa468784b92dbf4447b07d32db6b1880e2b Mon Sep 17 00:00:00 2001 From: Shivangi Reja Date: Wed, 4 Mar 2020 17:08:59 -0800 Subject: [PATCH 1/3] [Service Bus] Add message batching --- .../src/Amqp/AmqpMessageBatch.cs | 162 ++++++++++ .../src/Amqp/AmqpMessageConverter.cs | 132 +++++--- .../src/Amqp/AmqpSender.cs | 297 ++++++++++++------ .../src/Core/TransportMessageBatch.cs | 65 ++++ .../src/Core/TransportSender.cs | 43 ++- .../src/Sender/CreateBatchOptions.cs | 85 +++++ .../src/Sender/ServiceBusMessageBatch.cs | 125 +++----- .../src/Sender/ServiceBusSender.cs | 118 ++++--- .../tests/ProcessorLiveTests.cs | 19 +- .../tests/ReceiverLiveTests.cs | 87 +++-- .../tests/SenderLiveTests.cs | 69 +++- .../tests/SenderTests.cs | 30 +- .../tests/SessionReceiverLiveTests.cs | 117 +++++-- 13 files changed, 995 insertions(+), 354 deletions(-) create mode 100644 sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageBatch.cs create mode 100644 sdk/servicebus/Azure.Messaging.ServiceBus/src/Core/TransportMessageBatch.cs create mode 100644 sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/CreateBatchOptions.cs diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageBatch.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageBatch.cs new file mode 100644 index 0000000000000..5eeebf76f1791 --- /dev/null +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageBatch.cs @@ -0,0 +1,162 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.Globalization; +using System.Linq; +using Azure.Core; +using Azure.Messaging.ServiceBus.Core; +using Microsoft.Azure.Amqp; + +namespace Azure.Messaging.ServiceBus.Amqp +{ + /// + /// A set of messages with known size constraints, based on messages to be sent + /// using an AMQP-based transport. + /// + /// + internal class AmqpMessageBatch : TransportMessageBatch + { + /// The amount of bytes to reserve as overhead for a small message. + private const byte OverheadBytesSmallMessage = 5; + + /// The amount of bytes to reserve as overhead for a large message. + private const byte OverheadBytesLargeMessage = 8; + + /// The maximum number of bytes that a message may be to be considered small. + private const byte MaximumBytesSmallMessage = 255; + + /// A flag that indicates whether or not the instance has been disposed. + private bool _disposed = false; + + /// The size of the batch, in bytes, as it will be sent via the AMQP transport. + private long _sizeBytes = 0; + + /// + /// The maximum size allowed for the batch, in bytes. This includes the messages in the batch as + /// well as any overhead for the batch itself when sent to the Queue/Topic. + /// + /// + public override long MaximumSizeInBytes { get; } + + /// + /// The size of the batch, in bytes, as it will be sent to the Queue/Topic + /// service. + /// + /// + public override long SizeInBytes => _sizeBytes; + + /// + /// The count of messages contained in the batch. + /// + /// + public override int Count => BatchMessages.Count; + + /// + /// The set of options to apply to the batch. + /// + /// + private CreateBatchOptions Options { get; } + + /// + /// The set of messages that have been added to the batch. + /// + /// + private List BatchMessages { get; } = new List(); + + /// + /// Initializes a new instance of the class. + /// + /// + /// The set of options to apply to the batch. + /// + public AmqpMessageBatch(CreateBatchOptions options) + { + Argument.AssertNotNull(options, nameof(options)); + Argument.AssertNotNull(options.MaximumSizeInBytes, nameof(options.MaximumSizeInBytes)); + + Options = options; + MaximumSizeInBytes = options.MaximumSizeInBytes.Value; + + // Initialize the size by reserving space for the batch envelope. + + using AmqpMessage envelope = AmqpMessageConverter.BatchSBMessagesAsAmqpMessage(Enumerable.Empty()); + _sizeBytes = envelope.SerializedMessageSize; + } + + /// + /// Attempts to add a message to the batch, ensuring that the size + /// of the batch does not exceed its maximum. + /// + /// + /// The message to attempt to add to the batch. + /// + /// true if the message was added; otherwise, false. + /// + public override bool TryAdd(ServiceBusMessage message) + { + Argument.AssertNotNull(message, nameof(message)); + Argument.AssertNotDisposed(_disposed, nameof(ServiceBusMessageBatch)); + + AmqpMessage amqpMessage = AmqpMessageConverter.SBMessageToAmqpMessage(message); + + try + { + // Calculate the size for the message, based on the AMQP message size and accounting for a + // bit of reserved overhead size. + + var size = _sizeBytes + + amqpMessage.SerializedMessageSize + + (amqpMessage.SerializedMessageSize <= MaximumBytesSmallMessage + ? OverheadBytesSmallMessage + : OverheadBytesLargeMessage); + + if (size > MaximumSizeInBytes) + { + return false; + } + + _sizeBytes = size; + BatchMessages.Add(message); + + return true; + } + finally + { + amqpMessage?.Dispose(); + } + } + + /// + /// Represents the batch as an enumerable set of transport-specific + /// representations of a message. + /// + /// + /// The transport-specific message representation being requested. + /// + /// The set of messages as an enumerable of the requested type. + /// + public override IEnumerable AsEnumerable() + { + if (typeof(T) != typeof(ServiceBusMessage)) + { + throw new FormatException(string.Format(CultureInfo.CurrentCulture, Resources1.UnsupportedTransportEventType, typeof(T).Name)); + } + + return (IEnumerable)BatchMessages; + } + + /// + /// Performs the task needed to clean up resources used by the . + /// + /// + public override void Dispose() + { + _disposed = true; + + BatchMessages.Clear(); + _sizeBytes = 0; + } + } +} diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageConverter.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageConverter.cs index c9233360faf06..7f8fa00f41760 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageConverter.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageConverter.cs @@ -6,9 +6,10 @@ namespace Azure.Messaging.ServiceBus.Amqp using System; using System.Collections; using System.Collections.Generic; - using System.Data; using System.IO; + using System.Linq; using System.Runtime.Serialization; + using Azure.Core; using Microsoft.Azure.Amqp; using Microsoft.Azure.Amqp.Encoding; using Microsoft.Azure.Amqp.Framing; @@ -32,72 +33,123 @@ internal static class AmqpMessageConverter private const string DateTimeOffsetName = AmqpConstants.Vendor + ":datetime-offset"; private const int GuidSize = 16; - public static AmqpMessage BatchSBMessagesAsAmqpMessage(IEnumerable sbMessages) + /// The size, in bytes, to use as a buffer for stream operations. + private const int StreamBufferSizeInBytes = 512; + + public static AmqpMessage BatchSBMessagesAsAmqpMessage(IEnumerable source) { - if (sbMessages == null) - { - throw Fx.Exception.ArgumentNull(nameof(sbMessages)); - } + Argument.AssertNotNull(source, nameof(source)); + return BuildAmqpBatchFromMessage(source); + } - AmqpMessage amqpMessage; + /// + /// Builds a batch from a set of + /// optionally propagating the custom properties. + /// + /// + /// The set of messages to use as the body of the batch message. + /// + /// The batch containing the source messages. + /// + private static AmqpMessage BuildAmqpBatchFromMessage(IEnumerable source) + { AmqpMessage firstAmqpMessage = null; SBMessage firstMessage = null; - List dataList = null; - var messageCount = 0; - foreach (var sbMessage in sbMessages) - { - messageCount++; - amqpMessage = AmqpMessageConverter.SBMessageToAmqpMessage(sbMessage); - if (firstAmqpMessage == null) + AmqpMessage amqpMessage = BuildAmqpBatchFromMessages( + source.Select(sbMessage => { - firstAmqpMessage = amqpMessage; - firstMessage = sbMessage; - continue; - } - - if (dataList == null) - { - dataList = new List { ToData(firstAmqpMessage) }; - } - - dataList.Add(ToData(amqpMessage)); - } - - if (messageCount == 1 && firstAmqpMessage != null) - { - firstAmqpMessage.Batchable = true; - return firstAmqpMessage; - } - - amqpMessage = AmqpMessage.Create(dataList); - amqpMessage.MessageFormat = AmqpConstants.AmqpBatchedMessageFormat; + if (firstAmqpMessage == null) + { + firstAmqpMessage = SBMessageToAmqpMessage(sbMessage); + firstMessage = sbMessage; + return firstAmqpMessage; + } + else + { + return SBMessageToAmqpMessage(sbMessage); + } + })); - if (firstMessage.MessageId != null) + if (firstMessage != null && firstMessage.MessageId != null) { amqpMessage.Properties.MessageId = firstMessage.MessageId; } - if (firstMessage.SessionId != null) + if (firstMessage != null && firstMessage.SessionId != null) { amqpMessage.Properties.GroupId = firstMessage.SessionId; } - if (firstMessage.PartitionKey != null) + if (firstMessage != null && firstMessage.PartitionKey != null) { amqpMessage.MessageAnnotations.Map[AmqpMessageConverter.PartitionKeyName] = firstMessage.PartitionKey; } - if (firstMessage.ViaPartitionKey != null) + if (firstMessage != null && firstMessage.ViaPartitionKey != null) { amqpMessage.MessageAnnotations.Map[AmqpMessageConverter.ViaPartitionKeyName] = firstMessage.ViaPartitionKey; } - amqpMessage.Batchable = true; return amqpMessage; } + /// + /// Builds a batch from a set of . + /// + /// + /// The set of messages to use as the body of the batch message. + /// + /// The batch containing the source messages. + /// + private static AmqpMessage BuildAmqpBatchFromMessages(IEnumerable source) + { + AmqpMessage batchEnvelope; + + var batchMessages = source.ToList(); + + if (batchMessages.Count == 1) + { + batchEnvelope = batchMessages[0]; + } + else + { + batchEnvelope = AmqpMessage.Create(batchMessages.Select(message => + { + message.Batchable = true; + using var messageStream = message.ToStream(); + return new Data { Value = ReadStreamToArraySegment(messageStream) }; + })); + + batchEnvelope.MessageFormat = AmqpConstants.AmqpBatchedMessageFormat; + } + + batchEnvelope.Batchable = true; + return batchEnvelope; + } + + /// + /// Converts a stream to an representation. + /// + /// + /// The stream to read and capture in memory. + /// + /// The containing the stream data. + /// + private static ArraySegment ReadStreamToArraySegment(Stream stream) + { + if (stream == null) + { + return new ArraySegment(); + } + + using var memStream = new MemoryStream(StreamBufferSizeInBytes); + stream.CopyTo(memStream, StreamBufferSizeInBytes); + + return new ArraySegment(memStream.ToArray()); + } + public static AmqpMessage SBMessageToAmqpMessage(SBMessage sbMessage) { var body = new ArraySegment((sbMessage.Body.IsEmpty) ? Array.Empty() : sbMessage.Body.ToArray()); diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpSender.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpSender.cs index 1d92a0bac2807..cd5a0a64ebe70 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpSender.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpSender.cs @@ -124,165 +124,254 @@ public AmqpSender( } /// - /// Sends a set of events to the associated Service Bus entity using a batched approach. If the size of events exceed the - /// maximum size of a single batch, an exception will be triggered and the send will fail. + /// Creates a size-constraint batch to which may be added using a try-based pattern. If a message would + /// exceed the maximum allowable size of the batch, the batch will not allow adding the message and signal that scenario using its + /// return value. + /// + /// Because messages that would violate the size constraint cannot be added, publishing a batch will not trigger an exception when + /// attempting to send the message to the Queue/Topic. + /// + /// + /// The set of options to consider when creating this batch. + /// An optional instance to signal the request to cancel the operation. + /// + /// An with the requested . + /// + public override async ValueTask CreateBatchAsync( + CreateBatchOptions options, + CancellationToken cancellationToken) + { + TransportMessageBatch messageBatch = null; + Task createBatchTask = _retryPolicy.RunOperation(async (timeout) => + { + messageBatch = await CreateBatchInternal( + options, + timeout).ConfigureAwait(false); + }, + _entityName, + _connectionScope, + cancellationToken); + await createBatchTask.ConfigureAwait(false); + return messageBatch; + } + + internal async ValueTask CreateBatchInternal( + CreateBatchOptions options, + TimeSpan timeout) + { + Argument.AssertNotNull(options, nameof(options)); + + // Ensure that maximum message size has been determined; this depends on the underlying + // AMQP link, so if not set, requesting the link will ensure that it is populated. + + if (!MaximumMessageSize.HasValue) + { + await _sendLink.GetOrCreateAsync(timeout).ConfigureAwait(false); + } + + // Ensure that there was a maximum size populated; if none was provided, + // default to the maximum size allowed by the link. + + options.MaximumSizeInBytes ??= MaximumMessageSize; + + Argument.AssertInRange(options.MaximumSizeInBytes.Value, ServiceBusSender.MinimumBatchSizeLimit, MaximumMessageSize.Value, nameof(options.MaximumSizeInBytes)); + return new AmqpMessageBatch(options); + } + + /// + /// Sends a set of messages to the associated Queue/Topic using a batched approach. /// /// - /// The set of event data to send. + /// The set of messages to send. + /// /// An optional instance to signal the request to cancel the operation. /// - public override async Task SendAsync(IEnumerable messages, - CancellationToken cancellationToken) + /// A task to be resolved on when the operation has completed. + /// + public override async Task SendBatchAsync( + ServiceBusMessageBatch messageBatch, + ServiceBusRetryPolicy retryPolicy, + CancellationToken cancellationToken) { - Argument.AssertNotNull(messages, nameof(messages)); + Argument.AssertNotNull(messageBatch, nameof(messageBatch)); Argument.AssertNotClosed(_closed, nameof(AmqpSender)); - AmqpMessage messageFactory() => AmqpMessageConverter.BatchSBMessagesAsAmqpMessage(messages); - await SendAsync(messageFactory, cancellationToken).ConfigureAwait(false); + await retryPolicy.RunOperation(async (timeout) => + await SendBatchInternal( + messageBatch, + timeout, + cancellationToken).ConfigureAwait(false), + _entityName, + _connectionScope, + cancellationToken).ConfigureAwait(false); } /// - /// Closes the connection to the transport producer instance. + /// Sends a set of messages to the associated Queue/Topic using a batched approach. /// /// + /// + /// /// An optional instance to signal the request to cancel the operation. /// - public override async Task CloseAsync(CancellationToken cancellationToken) + internal virtual async Task SendBatchInternal( + ServiceBusMessageBatch messageBatch, + TimeSpan timeout, + CancellationToken cancellationToken) { - if (_closed) + var stopWatch = Stopwatch.StartNew(); + + AmqpMessage messageFactory() => AmqpMessageConverter.BatchSBMessagesAsAmqpMessage(messageBatch.AsEnumerable()); + + using (AmqpMessage batchMessage = messageFactory()) { - return; - } + //ServiceBusEventSource.Log.SendStart(Entityname, messageHash); - _closed = true; + string messageHash = batchMessage.GetHashCode().ToString(); - var clientId = GetHashCode().ToString(); - var clientType = GetType(); + SendingAmqpLink link = await _sendLink.GetOrCreateAsync(UseMinimum(_connectionScope.SessionTimeout, timeout)).ConfigureAwait(false); - try - { - ServiceBusEventSource.Log.ClientCloseStart(clientType, _entityName, clientId); + // Validate that the message is not too large to send. This is done after the link is created to ensure + // that the maximum message size is known, as it is dictated by the service using the link. + + if (batchMessage.SerializedMessageSize > MaximumMessageSize) + { + throw new ServiceBusException(_entityName, string.Format(Resources1.MessageSizeExceeded, messageHash, batchMessage.SerializedMessageSize, MaximumMessageSize), ServiceBusException.FailureReason.MessageSizeExceeded); + } + + // Attempt to send the message batch. + + var deliveryTag = new ArraySegment(BitConverter.GetBytes(Interlocked.Increment(ref _deliveryCount))); + var outcome = await link.SendMessageAsync(batchMessage, deliveryTag, AmqpConstants.NullBinary, timeout.CalculateRemaining(stopWatch.Elapsed)).ConfigureAwait(false); cancellationToken.ThrowIfCancellationRequested(); - if (_sendLink?.TryGetOpenedObject(out var _) == true) + if (outcome.DescriptorCode != Accepted.Code) { - cancellationToken.ThrowIfCancellationRequested(); - await _sendLink.CloseAsync().ConfigureAwait(false); + throw AmqpError.CreateExceptionForError((outcome as Rejected)?.Error, _entityName); } - _sendLink?.Dispose(); - } - catch (Exception ex) - { - _closed = false; - ServiceBusEventSource.Log.ClientCloseError(clientType, _entityName, clientId, ex.Message); + //ServiceBusEventSource.Log.SendStop(Entityname, messageHash); - throw; - } - finally - { - ServiceBusEventSource.Log.ClientCloseComplete(clientType, _entityName, clientId); + cancellationToken.ThrowIfCancellationRequested(); + stopWatch.Stop(); } } /// - /// Sends an AMQP message that contains a batch of events to the associated Service Bus entity. If the size of events exceed the - /// maximum size of a single batch, an exception will be triggered and the send will fail. + /// Sends a message to the associated Service Bus entity. /// /// - /// A factory which can be used to produce an AMQP message containing the batch of events to be sent. + /// A message to send. + /// /// An optional instance to signal the request to cancel the operation. /// - protected virtual async Task SendAsync(Func messageFactory, - CancellationToken cancellationToken) + public override async Task SendAsync( + ServiceBusMessage message, + ServiceBusRetryPolicy retryPolicy, + CancellationToken cancellationToken) { - var failedAttemptCount = 0; - var stopWatch = Stopwatch.StartNew(); + Argument.AssertNotNull(message, nameof(message)); + Argument.AssertNotClosed(_closed, nameof(AmqpSender)); - SendingAmqpLink link; + await retryPolicy.RunOperation(async (timeout) => + await SendInternal( + message, + timeout, + cancellationToken).ConfigureAwait(false), + _entityName, + _connectionScope, + cancellationToken).ConfigureAwait(false); + } - try + /// + /// Sends a message to the associated Service Bus entity. + /// + /// + /// + /// + /// An optional instance to signal the request to cancel the operation. + /// + internal virtual async Task SendInternal( + ServiceBusMessage message, + TimeSpan timeout, + CancellationToken cancellationToken) + { + var stopWatch = Stopwatch.StartNew(); + using (AmqpMessage amqpMessage = AmqpMessageConverter.SBMessageToAmqpMessage(message)) { - var tryTimeout = _retryPolicy.CalculateTryTimeout(0); - while (!cancellationToken.IsCancellationRequested) + //ServiceBusEventSource.Log.SendStart(Entityname, messageHash); + + string messageHash = amqpMessage.GetHashCode().ToString(); + + SendingAmqpLink link = await _sendLink.GetOrCreateAsync(UseMinimum(_connectionScope.SessionTimeout, timeout)).ConfigureAwait(false); + + // Validate that the message is not too large to send. This is done after the link is created to ensure + // that the maximum message size is known, as it is dictated by the service using the link. + + if (amqpMessage.SerializedMessageSize > MaximumMessageSize) { - try - { - using AmqpMessage batchMessage = messageFactory(); - string messageHash = batchMessage.GetHashCode().ToString(); + throw new ServiceBusException(_entityName, string.Format(Resources1.MessageSizeExceeded, messageHash, amqpMessage.SerializedMessageSize, MaximumMessageSize), ServiceBusException.FailureReason.MessageSizeExceeded); + } - //ServiceBusEventSource.Log.EventPublishStart(EventHubName, logPartition, messageHash); + // Attempt to send the message batch. - link = await _sendLink.GetOrCreateAsync(UseMinimum(_connectionScope.SessionTimeout, tryTimeout)).ConfigureAwait(false); - cancellationToken.ThrowIfCancellationRequested(); + var deliveryTag = new ArraySegment(BitConverter.GetBytes(Interlocked.Increment(ref _deliveryCount))); + var outcome = await link.SendMessageAsync(amqpMessage, deliveryTag, AmqpConstants.NullBinary, timeout.CalculateRemaining(stopWatch.Elapsed)).ConfigureAwait(false); + cancellationToken.ThrowIfCancellationRequested(); - // Validate that the batch of messages is not too large to send. This is done after the link is created to ensure - // that the maximum message size is known, as it is dictated by the service using the link. + if (outcome.DescriptorCode != Accepted.Code) + { + throw AmqpError.CreateExceptionForError((outcome as Rejected)?.Error, _entityName); + } - if (batchMessage.SerializedMessageSize > MaximumMessageSize) - { - throw new ServiceBusException(_entityName, string.Format(Resources1.MessageSizeExceeded, messageHash, batchMessage.SerializedMessageSize, MaximumMessageSize), ServiceBusException.FailureReason.MessageSizeExceeded); - } + //ServiceBusEventSource.Log.SendStop(Entityname, messageHash); - // Attempt to send the message batch. + cancellationToken.ThrowIfCancellationRequested(); + stopWatch.Stop(); + } + } - var deliveryTag = new ArraySegment(BitConverter.GetBytes(Interlocked.Increment(ref _deliveryCount))); - var outcome = await link.SendMessageAsync(batchMessage, deliveryTag, AmqpConstants.NullBinary, tryTimeout.CalculateRemaining(stopWatch.Elapsed)).ConfigureAwait(false); - cancellationToken.ThrowIfCancellationRequested(); + /// + /// Closes the connection to the transport sender instance. + /// + /// + /// An optional instance to signal the request to cancel the operation. + /// + public override async Task CloseAsync(CancellationToken cancellationToken) + { + if (_closed) + { + return; + } - if (outcome.DescriptorCode != Accepted.Code) - { - throw AmqpError.CreateExceptionForError((outcome as Rejected)?.Error, _entityName); - } + _closed = true; - // The send operation should be considered successful; return to - // exit the retry loop. + var clientId = GetHashCode().ToString(); + var clientType = GetType(); - return; - } - catch (Exception ex) - { - Exception activeEx = ex.TranslateServiceException(_entityName); - - // Determine if there should be a retry for the next attempt; if so enforce the delay but do not quit the loop. - // Otherwise, bubble the exception. - - ++failedAttemptCount; - TimeSpan? retryDelay = _retryPolicy.CalculateRetryDelay(activeEx, failedAttemptCount); - - if (retryDelay.HasValue && !_connectionScope.IsDisposed && !cancellationToken.IsCancellationRequested) - { - //ServiceBusEventSource.Log.EventPublishError(EventHubName, messageHash, activeEx.Message); - await Task.Delay(retryDelay.Value, cancellationToken).ConfigureAwait(false); - - tryTimeout = _retryPolicy.CalculateTryTimeout(failedAttemptCount); - stopWatch.Reset(); - } - else if (ex is AmqpException) - { - throw activeEx; - } - else - { - throw; - } - } - } + try + { + ServiceBusEventSource.Log.ClientCloseStart(clientType, _entityName, clientId); + cancellationToken.ThrowIfCancellationRequested(); - // If no value has been returned nor exception thrown by this point, - // then cancellation has been requested. + if (_sendLink?.TryGetOpenedObject(out var _) == true) + { + cancellationToken.ThrowIfCancellationRequested(); + await _sendLink.CloseAsync().ConfigureAwait(false); + } - throw new TaskCanceledException(); + _sendLink?.Dispose(); } - catch (Exception) + catch (Exception ex) { - //ServiceBusEventSource.Log.EventPublishError(EventHubName, logPartition, messageHash, ex.Message); + _closed = false; + ServiceBusEventSource.Log.ClientCloseError(clientType, _entityName, clientId, ex.Message); + throw; } finally { - stopWatch.Stop(); - //ServiceBusEventSource.Log.EventPublishComplete(EventHubName, logPartition, messageHash); + ServiceBusEventSource.Log.ClientCloseComplete(clientType, _entityName, clientId); } } diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Core/TransportMessageBatch.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Core/TransportMessageBatch.cs new file mode 100644 index 0000000000000..a451bbbe3d22c --- /dev/null +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Core/TransportMessageBatch.cs @@ -0,0 +1,65 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; + +namespace Azure.Messaging.ServiceBus.Core +{ + /// + /// Provides an abstraction for generalizing a batch of messages so that a dedicated instance may provide operations + /// for a specific transport, such as AMQP or JMS. It is intended that the public employ + /// a transport batch via containment and delegate operations to it rather than understanding protocol-specific details + /// for different transports. + /// + /// + internal abstract class TransportMessageBatch : IDisposable + { + /// + /// The maximum size allowed for the batch, in bytes. This includes the messages in the batch as + /// well as any overhead for the batch itself when sent to the Queue/Topic. + /// + /// + public abstract long MaximumSizeInBytes { get; } + + /// + /// The size of the batch, in bytes, as it will be sent to the Queue/Topic + /// + /// + public abstract long SizeInBytes { get; } + + /// + /// The count of messages contained in the batch. + /// + /// + public abstract int Count { get; } + + /// + /// Attempts to add a message to the batch, ensuring that the size + /// of the batch does not exceed its maximum. + /// + /// + /// The message to attempt to add to the batch. + /// + /// true if the message was added; otherwise, false. + /// + public abstract bool TryAdd(ServiceBusMessage message); + + /// + /// Represents the batch as an enumerable set of transport-specific + /// representations of a message. + /// + /// + /// The transport-specific message representation being requested. + /// + /// The set of messages as an enumerable of the requested type. + /// + public abstract IEnumerable AsEnumerable(); + + /// + /// Performs the task needed to clean up resources used by the . + /// + /// + public abstract void Dispose(); + } +} diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Core/TransportSender.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Core/TransportSender.cs index 3cff6ab6db0af..09f9cf8db0ba9 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Core/TransportSender.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Core/TransportSender.cs @@ -28,15 +28,50 @@ internal abstract class TransportSender public virtual bool IsClosed { get; } /// - /// Sends a set of events to the associated Service Bus entity using a batched approach. If the size of events exceed the - /// maximum size of a single batch, an exception will be triggered and the send will fail. + /// Creates a size-constraint batch to which may be added using a try-based pattern. If a message would + /// exceed the maximum allowable size of the batch, the batch will not allow adding the message and signal that scenario using its + /// return value. + /// + /// Because messages that would violate the size constraint cannot be added, publishing a batch will not trigger an exception when + /// attempting to send the message to the Queue/Topic. /// /// - /// The set of event data to send. + /// The set of options to consider when creating this batch. + /// An optional instance to signal the request to cancel the operation. + /// + /// An with the requested . + /// + /// + /// + public abstract ValueTask CreateBatchAsync( + CreateBatchOptions options, + CancellationToken cancellationToken); + /// + /// Sends a message to the associated Service Bus entity. + /// + /// + /// A message to send. + /// /// An optional instance to signal the request to cancel the operation. /// public abstract Task SendAsync( - IEnumerable messages, + ServiceBusMessage message, + ServiceBusRetryPolicy retryPolicy, + CancellationToken cancellationToken); + + /// + /// Sends a set of messages to the associated Queue/Topic using a batched approach. + /// + /// + /// The set of messages to send. + /// + /// An optional instance to signal the request to cancel the operation. + /// + /// A task to be resolved on when the operation has completed. + /// + public abstract Task SendBatchAsync( + ServiceBusMessageBatch messageBatch, + ServiceBusRetryPolicy retryPolicy, CancellationToken cancellationToken); /// diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/CreateBatchOptions.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/CreateBatchOptions.cs new file mode 100644 index 0000000000000..1350e1c58befa --- /dev/null +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/CreateBatchOptions.cs @@ -0,0 +1,85 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System.ComponentModel; +using Azure.Core; + +namespace Azure.Messaging.ServiceBus +{ + + /// + /// The set of options that can be specified to influence the way in which an service bus message batch + /// behaves and is sent to the Queue/Topic. + /// + /// + public class CreateBatchOptions + { + /// The requested maximum size to allow for the batch, in bytes. + private long? _maximumSizeInBytes = null; + + /// + /// The maximum size to allow for a single batch of messages, in bytes. + /// + /// + /// + /// The desired limit, in bytes, for the size of the associated service bus message batch. If null, + /// the maximum size allowed by the active transport will be used. + /// + /// + public long? MaximumSizeInBytes + { + get => _maximumSizeInBytes; + + set + { + if (value.HasValue) + { + Argument.AssertAtLeast(value.Value, ServiceBusSender.MinimumBatchSizeLimit, nameof(MaximumSizeInBytes)); + } + + _maximumSizeInBytes = value; + } + } + + /// + /// Determines whether the specified is equal to this instance. + /// + /// + /// The to compare with this instance. + /// + /// true if the specified is equal to this instance; otherwise, false. + /// + [EditorBrowsable(EditorBrowsableState.Never)] + public override bool Equals(object obj) => base.Equals(obj); + + /// + /// Returns a hash code for this instance. + /// + /// + /// A hash code for this instance, suitable for use in hashing algorithms and data structures like a hash table. + /// + [EditorBrowsable(EditorBrowsableState.Never)] + public override int GetHashCode() => base.GetHashCode(); + + /// + /// Converts the instance to string representation. + /// + /// + /// A that represents this instance. + /// + [EditorBrowsable(EditorBrowsableState.Never)] + public override string ToString() => base.ToString(); + + /// + /// Creates a new copy of the current , cloning its attributes into a new instance. + /// + /// + /// A new copy of . + /// + internal CreateBatchOptions Clone() => + new CreateBatchOptions + { + _maximumSizeInBytes = MaximumSizeInBytes + }; + } +} diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/ServiceBusMessageBatch.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/ServiceBusMessageBatch.cs index db836a35e4d38..32959b5fb5a8d 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/ServiceBusMessageBatch.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/ServiceBusMessageBatch.cs @@ -2,124 +2,95 @@ // Licensed under the MIT License. using System; -using System.Collections; using System.Collections.Generic; using Azure.Core; +using Azure.Messaging.ServiceBus.Core; namespace Azure.Messaging.ServiceBus { /// /// A set of with size constraints known up-front, - /// intended to be sent to the Event Hubs service as a single batch. + /// intended to be sent to the Queue/Topic as a single batch. /// /// - public sealed class ServiceBusMessageBatch : IDisposable, IEnumerable + public sealed class ServiceBusMessageBatch : IDisposable { /// - /// The maximum size allowed for the batch, in bytes. This includes the events in the batch as - /// well as any overhead for the batch itself when sent to the Event Hubs service. + /// The maximum size allowed for the batch, in bytes. This includes the messages in the batch as + /// well as any overhead for the batch itself when sent to the Queue/Topic. /// /// - public long MaximumSizeInBytes { get; } + public long MaximumSizeInBytes => InnerBatch.MaximumSizeInBytes; /// - /// The size of the batch, in bytes, as it will be sent to the Event Hubs - /// service. + /// The size of the batch, in bytes, as it will be sent to the Queue/Topic. /// /// - public long SizeInBytes { get; } + public long SizeInBytes => InnerBatch.SizeInBytes; /// - /// The count of events contained in the batch. + /// The count of messages contained in the batch. /// /// - public int Count { get; } + public int Count => InnerBatch.Count; - ///// - ///// The set of options that should be used when publishing the batch. - ///// - ///// - //internal SendEventOptions SendOptions { get; } - - ///// - ///// The transport-specific batch responsible for performing the batch operations - ///// in a manner compatible with the associated . - ///// - ///// - //private TransportEventBatch InnerBatch { get; } + /// + /// The transport-specific batch responsible for performing the batch operations + /// in a manner compatible with the associated . + /// + /// + private TransportMessageBatch InnerBatch { get; } - ///// - ///// Initializes a new instance of the class. - ///// - ///// - ///// The transport-specific batch responsible for performing the batch operations. - ///// The set of options that should be used when publishing the batch. - ///// - ///// - ///// As an internal type, this class performs only basic sanity checks against its arguments. It - ///// is assumed that callers are trusted and have performed deep validation. - ///// - ///// Any parameters passed are assumed to be owned by this instance and safe to mutate or dispose; - ///// creation of clones or otherwise protecting the parameters is assumed to be the purview of the - ///// caller. - ///// - ///// - //internal EventDataBatch(TransportEventBatch transportBatch, - // SendEventOptions sendOptions) - //{ - // Argument.AssertNotNull(transportBatch, nameof(transportBatch)); - // Argument.AssertNotNull(sendOptions, nameof(sendOptions)); + /// + /// Initializes a new instance of the class. + /// + /// + /// The transport-specific batch responsible for performing the batch operations. + /// + /// + /// As an internal type, this class performs only basic sanity checks against its arguments. It + /// is assumed that callers are trusted and have performed deep validation. + /// + /// Any parameters passed are assumed to be owned by this instance and safe to mutate or dispose; + /// creation of clones or otherwise protecting the parameters is assumed to be the purview of the + /// caller. + /// + /// + internal ServiceBusMessageBatch(TransportMessageBatch transportBatch) + { + Argument.AssertNotNull(transportBatch, nameof(transportBatch)); - // InnerBatch = transportBatch; - // SendOptions = sendOptions; - //} + InnerBatch = transportBatch; + } /// - /// Attempts to add an event to the batch, ensuring that the size + /// Attempts to add a message to the batch, ensuring that the size /// of the batch does not exceed its maximum. /// /// - /// The event to attempt to add to the batch. + /// Message to attempt to add to the batch. /// - /// true if the event was added; otherwise, false. + /// true if the message was added; otherwise, false. /// public bool TryAdd(ServiceBusMessage message) { - return true; - //bool instrumented = EventDataInstrumentation.InstrumentEvent(eventData); - //bool added = InnerBatch.TryAdd(eventData); - - //if (!added && instrumented) - //{ - // EventDataInstrumentation.ResetEvent(eventData); - //} - - //return added; + return InnerBatch.TryAdd(message); } /// /// Performs the task needed to clean up resources used by the . /// /// - public void Dispose() { } + public void Dispose() => InnerBatch.Dispose(); /// - /// + /// Represents the batch as an enumerable set of specific representations of a message. /// - /// - public IEnumerator GetEnumerator() - { - throw new NotImplementedException(); - } - - ///// - ///// Represents the batch as an enumerable set of specific representations of an event. - ///// - ///// - ///// The specific event representation being requested. - ///// - ///// The set of events as an enumerable of the requested type. - ///// - //internal IEnumerable AsEnumerable() => InnerBatch.AsEnumerable(); + /// + /// The specific message representation being requested. + /// + /// The set of messages as an enumerable of the requested type. + /// + internal IEnumerable AsEnumerable() => InnerBatch.AsEnumerable(); } } diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/ServiceBusSender.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/ServiceBusSender.cs index f633d7e182d9a..8db1ad6279333 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/ServiceBusSender.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/ServiceBusSender.cs @@ -23,6 +23,9 @@ namespace Azure.Messaging.ServiceBus /// public class ServiceBusSender : IAsyncDisposable { + /// The minimum allowable size, in bytes, for a batch to be sent. + internal const int MinimumBatchSizeLimit = 24; + /// /// The fully qualified Service Bus namespace that the producer is associated with. This is likely /// to be similar to {yournamespace}.servicebus.windows.net. @@ -120,54 +123,82 @@ protected ServiceBusSender() } /// - /// Sends a set of events to the associated Service Bus entity using a batched approach. If the size of events exceed the - /// maximum size of a single batch, an exception will be triggered and the send will fail. + /// Sends a message to the associated entity of Service Bus. /// /// - /// The set of event data to send. + /// A messsage to send. /// An optional instance to signal the request to cancel the operation. /// /// A task to be resolved on when the operation has completed. /// - /// - /// public virtual async Task SendAsync( ServiceBusMessage message, CancellationToken cancellationToken = default) { Argument.AssertNotNull(message, nameof(message)); - await SendBatchAsync(new ServiceBusMessage[]{message}, cancellationToken).ConfigureAwait(false); + await _innerSender.SendAsync(message, _retryPolicy, cancellationToken).ConfigureAwait(false); + } + + /// + /// Creates a size-constraint batch to which may be added using a try-based pattern. If a message would + /// exceed the maximum allowable size of the batch, the batch will not allow adding the message and signal that scenario using its + /// return value. + /// + /// Because messages that would violate the size constraint cannot be added, publishing a batch will not trigger an exception when + /// attempting to send the messages to the Queue/Topic. + /// + /// + /// An optional instance to signal the request to cancel the operation. + /// + /// An with the default batch options. + /// + /// + /// + public virtual ValueTask CreateBatchAsync(CancellationToken cancellationToken = default) => CreateBatchAsync(null, cancellationToken); + + /// + /// Creates a size-constraint batch to which may be added using a try-based pattern. If a message would + /// exceed the maximum allowable size of the batch, the batch will not allow adding the message and signal that scenario using its + /// return value. + /// + /// Because messages that would violate the size constraint cannot be added, publishing a batch will not trigger an exception when + /// attempting to send the messages to the Queue/Topic. + /// + /// + /// The set of options to consider when creating this batch. + /// An optional instance to signal the request to cancel the operation. + /// + /// An with the requested . + /// + /// + /// + public virtual async ValueTask CreateBatchAsync( + CreateBatchOptions options, + CancellationToken cancellationToken = default) + { + options = options?.Clone() ?? new CreateBatchOptions(); + + TransportMessageBatch transportBatch = await _innerSender.CreateBatchAsync(options, cancellationToken).ConfigureAwait(false); + return new ServiceBusMessageBatch(transportBatch); } /// - /// Sends a set of events to the associated Service Bus entity using a batched approach. If the size of events exceed the + /// Sends a set of messages to the associated Service Bus entity using a batched approach. If the size of messages exceed the /// maximum size of a single batch, an exception will be triggered and the send will fail. /// /// - /// The set of event data to send. + /// The set of messages to send. A batch may be created using . /// An optional instance to signal the request to cancel the operation. /// /// A task to be resolved on when the operation has completed. /// - /// - /// - public virtual async Task SendBatchAsync(IEnumerable messages, CancellationToken cancellationToken = default) => - await SendRangeInternal(messages, cancellationToken).ConfigureAwait(false); - - ///// - ///// Sends a set of events to the associated Service Bus entity using a batched approach. If the size of events exceed the - ///// maximum size of a single batch, an exception will be triggered and the send will fail. - ///// - ///// - ///// The set of event data to send. - ///// An optional instance to signal the request to cancel the operation. - ///// - ///// A task to be resolved on when the operation has completed. - ///// - ///// - ///// - //public virtual async Task SendBatchAsync(ServiceBusMessageBatch messages, CancellationToken cancellationToken = default) => - // await SendRangeInternal(new ServiceBusMessage[] { }, cancellationToken).ConfigureAwait(false); + public virtual async Task SendBatchAsync( + ServiceBusMessageBatch messageBatch, + CancellationToken cancellationToken = default) + { + Argument.AssertNotNull(messageBatch, nameof(messageBatch)); + await _innerSender.SendBatchAsync(messageBatch, _retryPolicy, cancellationToken).ConfigureAwait(false); + } /// /// Schedules a message to appear on Service Bus at a later time. @@ -198,39 +229,6 @@ public virtual async Task CancelScheduledMessageAsync(long sequenceNumber, Cance await _innerSender.CancelScheduledMessageAsync(sequenceNumber, _retryPolicy, cancellationToken).ConfigureAwait(false); } - /// - /// Sends a set of events to the associated Service Bus entity using a batched approach. If the size of events exceed the - /// maximum size of a single batch, an exception will be triggered and the send will fail. - /// - /// - /// The set of event data to send. - /// An optional instance to signal the request to cancel the operation. - /// - /// A task to be resolved on when the operation has completed. - /// - - /// - internal virtual async Task SendRangeInternal( - IEnumerable messages, - CancellationToken cancellationToken) - { - Argument.AssertNotNull(messages, nameof(messages)); - - using DiagnosticScope scope = CreateDiagnosticScope(); - messages = messages.ToList(); - InstrumentMessages(messages); - - try - { - await _innerSender.SendAsync(messages, cancellationToken).ConfigureAwait(false); - } - catch (Exception ex) - { - scope.Failed(ex); - throw; - } - } - /// /// Closes the producer. /// diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/ProcessorLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/ProcessorLiveTests.cs index 3139dbc394098..e98fa9febe8a1 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/ProcessorLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/ProcessorLiveTests.cs @@ -30,7 +30,15 @@ public async Task Receive_Event(int numThreads) // use double the number of threads so we can make sure we test that we don't // retrieve more messages than expected when there are more messages available - await sender.SendBatchAsync(GetMessages(numThreads * 2)); + IEnumerable messages = GetMessages(numThreads * 2); + using ServiceBusMessageBatch batch = await sender.CreateBatchAsync(); + + foreach (ServiceBusMessage message in messages) + { + Assert.That(() => batch.TryAdd(message), Is.True, "A message was rejected by the batch; all messages should be accepted."); + } + await sender.SendBatchAsync(batch); + await using var processor = client.GetProcessor(scope.QueueName); int messageCt = 0; @@ -83,7 +91,14 @@ public async Task Receive_StopProcessing(int numThreads) await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString); ServiceBusSender sender = client.GetSender(scope.QueueName); int numMessages = 50; - await sender.SendBatchAsync(GetMessages(numMessages)); + IEnumerable messages = GetMessages(numMessages); + using ServiceBusMessageBatch batch = await sender.CreateBatchAsync(); + + foreach (ServiceBusMessage message in messages) + { + Assert.That(() => batch.TryAdd(message), Is.True, "A message was rejected by the batch; all messages should be accepted."); + } + await sender.SendBatchAsync(batch); await using var processor = client.GetProcessor(scope.QueueName); int messageProcessedCt = 0; diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/ReceiverLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/ReceiverLiveTests.cs index 3c1a5502e895a..e3a91ffc1608a 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/ReceiverLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/ReceiverLiveTests.cs @@ -24,7 +24,13 @@ public async Task Peek() var messageCt = 10; IEnumerable sentMessages = GetMessages(messageCt); - await sender.SendBatchAsync(sentMessages); + using ServiceBusMessageBatch batch = await sender.CreateBatchAsync(); + + foreach (ServiceBusMessage message in sentMessages) + { + Assert.That(() => batch.TryAdd(message), Is.True, "A message was rejected by the batch; all messages should be accepted."); + } + await sender.SendBatchAsync(batch); await using var receiver = client.GetReceiver(scope.QueueName); @@ -54,7 +60,13 @@ public async Task ReceiveMessagesInPeekLockMode() ServiceBusSender sender = client.GetSender(scope.QueueName); var messageCount = 10; IEnumerable messages = GetMessages(messageCount); - await sender.SendBatchAsync(messages); + using ServiceBusMessageBatch batch = await sender.CreateBatchAsync(); + + foreach (ServiceBusMessage message in messages) + { + Assert.That(() => batch.TryAdd(message), Is.True, "A message was rejected by the batch; all messages should be accepted."); + } + await sender.SendBatchAsync(batch); var receiver = client.GetReceiver(scope.QueueName); var receivedMessageCount = 0; @@ -87,7 +99,13 @@ public async Task CompleteMessages() ServiceBusSender sender = client.GetSender(scope.QueueName); var messageCount = 10; IEnumerable messages = GetMessages(messageCount); - await sender.SendBatchAsync(messages); + using ServiceBusMessageBatch batch = await sender.CreateBatchAsync(); + + foreach (ServiceBusMessage message in messages) + { + Assert.That(() => batch.TryAdd(message), Is.True, "A message was rejected by the batch; all messages should be accepted."); + } + await sender.SendBatchAsync(batch); var receiver = client.GetReceiver(scope.QueueName); var receivedMessageCount = 0; @@ -102,8 +120,8 @@ public async Task CompleteMessages() } Assert.AreEqual(messageCount, receivedMessageCount); - var message = receiver.PeekAsync(); - Assert.IsNull(message.Result); + var peekedMessage = receiver.PeekAsync(); + Assert.IsNull(peekedMessage.Result); } } @@ -116,7 +134,13 @@ public async Task AbandonMessages() ServiceBusSender sender = client.GetSender(scope.QueueName); var messageCount = 10; IEnumerable messages = GetMessages(messageCount); - await sender.SendBatchAsync(messages); + using ServiceBusMessageBatch batch = await sender.CreateBatchAsync(); + + foreach (ServiceBusMessage message in messages) + { + Assert.That(() => batch.TryAdd(message), Is.True, "A message was rejected by the batch; all messages should be accepted."); + } + await sender.SendBatchAsync(batch); var receiver = client.GetReceiver(scope.QueueName); var receivedMessageCount = 0; @@ -153,7 +177,13 @@ public async Task DeadLetterMessages() ServiceBusSender sender = client.GetSender(scope.QueueName); var messageCount = 10; IEnumerable messages = GetMessages(messageCount); - await sender.SendBatchAsync(messages); + using ServiceBusMessageBatch batch = await sender.CreateBatchAsync(); + + foreach (ServiceBusMessage message in messages) + { + Assert.That(() => batch.TryAdd(message), Is.True, "A message was rejected by the batch; all messages should be accepted."); + } + await sender.SendBatchAsync(batch); var receiver = client.GetReceiver(scope.QueueName); var receivedMessageCount = 0; @@ -168,8 +198,8 @@ public async Task DeadLetterMessages() } Assert.AreEqual(messageCount, receivedMessageCount); - var message = receiver.PeekAsync(); - Assert.IsNull(message.Result); + var peekedMessage = receiver.PeekAsync(); + Assert.IsNull(peekedMessage.Result); messageEnum.Reset(); receivedMessageCount = 0; @@ -199,7 +229,13 @@ public async Task DeferMessages() ServiceBusSender sender = client.GetSender(scope.QueueName); var messageCount = 10; IEnumerable messages = GetMessages(messageCount); - await sender.SendBatchAsync(messages); + using ServiceBusMessageBatch batch = await sender.CreateBatchAsync(); + + foreach (ServiceBusMessage message in messages) + { + Assert.That(() => batch.TryAdd(message), Is.True, "A message was rejected by the batch; all messages should be accepted."); + } + await sender.SendBatchAsync(batch); var receiver = client.GetReceiver(scope.QueueName); var receivedMessageCount = 0; @@ -236,7 +272,13 @@ public async Task ReceiveMessagesInReceiveAndDeleteMode() ServiceBusSender sender = client.GetSender(scope.QueueName); var messageCount = 10; IEnumerable messages = GetMessages(messageCount); - await sender.SendBatchAsync(messages); + using ServiceBusMessageBatch batch = await sender.CreateBatchAsync(); + + foreach (ServiceBusMessage message in messages) + { + Assert.That(() => batch.TryAdd(message), Is.True, "A message was rejected by the batch; all messages should be accepted."); + } + await sender.SendBatchAsync(batch); var clientOptions = new ServiceBusReceiverOptions() { @@ -254,8 +296,8 @@ public async Task ReceiveMessagesInReceiveAndDeleteMode() } Assert.AreEqual(messageCount, receivedMessageCount); - var message = receiver.PeekAsync(); - Assert.IsNull(message.Result); + var peekedMessage = receiver.PeekAsync(); + Assert.IsNull(peekedMessage.Result); } } @@ -290,30 +332,27 @@ public async Task RenewMessageLock() await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString); ServiceBusSender sender = client.GetSender(scope.QueueName); var messageCount = 1; - IEnumerable messages = GetMessages(messageCount); - await sender.SendBatchAsync(messages); + ServiceBusMessage message = GetMessage(); + await sender.SendAsync(message); var receiver = client.GetReceiver(scope.QueueName); ServiceBusReceivedMessage[] receivedMessages = (await receiver.ReceiveBatchAsync(messageCount)).ToArray(); - var message = receivedMessages.First(); - var firstLockedUntilUtcTime = message.LockedUntilUtc; + var receivedMessage = receivedMessages.First(); + var firstLockedUntilUtcTime = receivedMessage.LockedUntilUtc; // Sleeping for 10 seconds... await Task.Delay(10000); - await receiver.RenewLockAsync(message); + await receiver.RenewLockAsync(receivedMessage); - Assert.True(message.LockedUntilUtc >= firstLockedUntilUtcTime + TimeSpan.FromSeconds(10)); + Assert.True(receivedMessage.LockedUntilUtc >= firstLockedUntilUtcTime + TimeSpan.FromSeconds(10)); // Complete Messages - await receiver.CompleteAsync(message); - - var messageEnum = messages.GetEnumerator(); - messageEnum.MoveNext(); + await receiver.CompleteAsync(receivedMessage); Assert.AreEqual(messageCount, receivedMessages.Length); - Assert.AreEqual(messageEnum.Current.MessageId, message.MessageId); + Assert.AreEqual(message.MessageId, receivedMessage.MessageId); var peekedMessage = receiver.PeekAsync(); Assert.IsNull(peekedMessage.Result); diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/SenderLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/SenderLiveTests.cs index 0eb161a32b277..3963999c5e309 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/SenderLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/SenderLiveTests.cs @@ -2,7 +2,9 @@ // Licensed under the MIT License. using System; +using System.Collections.Generic; using System.Net; +using System.Text; using System.Threading.Tasks; using Azure.Identity; using Azure.Messaging.ServiceBus; @@ -20,7 +22,7 @@ public async Task Send_ConnString() await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false)) { await using var sender = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString).GetSender(scope.QueueName); - await sender.SendBatchAsync(GetMessages(10)); + await sender.SendAsync(GetMessage()); } } @@ -88,6 +90,71 @@ public async Task Send_Topic_Session() } } + [Test] + public async Task SenderCanSendAMessageBatch() + { + await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false)) + { + await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString); + ServiceBusSender sender = client.GetSender(scope.QueueName); + using ServiceBusMessageBatch batch = await sender.CreateBatchAsync(); + + batch.TryAdd(new ServiceBusMessage(Encoding.UTF8.GetBytes("This is a message"))); + batch.TryAdd(new ServiceBusMessage(Encoding.UTF8.GetBytes("This is another message"))); + batch.TryAdd(new ServiceBusMessage(Encoding.UTF8.GetBytes("So many messages"))); + + await sender.SendBatchAsync(batch); + } + } + + [Test] + public async Task SenderCanSendZeroLengthMessageBatch() + { + await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false)) + { + await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString); + ServiceBusSender sender = client.GetSender(scope.QueueName); + using ServiceBusMessageBatch batch = await sender.CreateBatchAsync(); + batch.TryAdd(new ServiceBusMessage(Array.Empty())); + + await sender.SendBatchAsync(batch); + } + } + + [Test] + public async Task SenderCanSendLargeMessageBatch() + { + await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false)) + { + await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString); + ServiceBusSender sender = client.GetSender(scope.QueueName); + using ServiceBusMessageBatch batch = await sender.CreateBatchAsync(); + + // Actual limit is 262144 bytes for a single message. + batch.TryAdd(new ServiceBusMessage(new byte[100000 / 3])); + batch.TryAdd(new ServiceBusMessage(new byte[100000 / 3])); + batch.TryAdd(new ServiceBusMessage(new byte[100000 / 3])); + + await sender.SendBatchAsync(batch); + } + } + + [Test] + public async Task SenderCannotSendLargerThanMaximumSize() + { + await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false)) + { + await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString); + ServiceBusSender sender = client.GetSender(scope.QueueName); + using ServiceBusMessageBatch batch = await sender.CreateBatchAsync(); + + // Actual limit is 262144 bytes for a single message. + ServiceBusMessage message = new ServiceBusMessage(new byte[300000]); + + Assert.That(async () => await sender.SendAsync(message), Throws.InstanceOf().And.Property(nameof(ServiceBusException.Reason)).EqualTo(ServiceBusException.FailureReason.MessageSizeExceeded)); + } + } + [Test] public async Task ClientProperties() { diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/SenderTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/SenderTests.cs index f8525bc89543d..3a620b007897f 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/SenderTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/SenderTests.cs @@ -26,22 +26,22 @@ public void Send_NullShouldThrow() Assert.ThrowsAsync(async () => await mock.Object.SendAsync(null)); } - [Test] - public async Task Send_DelegatesToSendRange() - { - var mock = new Mock() - { - CallBase = true - }; - mock - .Setup(m => m.SendBatchAsync( - It.Is>(value => value.Count() == 1), - It.IsAny())) - .Returns(Task.CompletedTask) - .Verifiable("The single send should delegate to the batch send."); + //[Test] + //public async Task Send_DelegatesToSendRange() + //{ + // var mock = new Mock() + // { + // CallBase = true + // }; + // mock + // .Setup(m => m.SendBatchAsync( + // It.Is>(value => value.Count() == 1), + // It.IsAny())) + // .Returns(Task.CompletedTask) + // .Verifiable("The single send should delegate to the batch send."); - await mock.Object.SendAsync(new ServiceBusMessage()); - } + // await mock.Object.SendAsync(new ServiceBusMessage()); + //} [Test] public void SendRange_NullShouldThrow() diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/SessionReceiverLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/SessionReceiverLiveTests.cs index e75a1f80dd0d3..22e9cc76d6144 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/SessionReceiverLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/SessionReceiverLiveTests.cs @@ -34,7 +34,13 @@ public async Task Peek_Session(long? sequenceNumber, string partitionKey) // send the messages IEnumerable sentMessages = GetMessages(messageCt, sessionId, partitionKey); - await sender.SendBatchAsync(sentMessages); + using ServiceBusMessageBatch batch = await sender.CreateBatchAsync(); + + foreach (ServiceBusMessage message in sentMessages) + { + Assert.That(() => batch.TryAdd(message), Is.True, "A message was rejected by the batch; all messages should be accepted."); + } + await sender.SendBatchAsync(batch); Dictionary sentMessageIdToMsg = new Dictionary(); foreach (ServiceBusMessage message in sentMessages) { @@ -82,7 +88,13 @@ public async Task Lock_Same_Session_Should_Throw() var sessionId = Guid.NewGuid().ToString(); // send the messages IEnumerable sentMessages = GetMessages(messageCt, sessionId); - await sender.SendBatchAsync(sentMessages); + using ServiceBusMessageBatch batch = await sender.CreateBatchAsync(); + + foreach (ServiceBusMessage message in sentMessages) + { + Assert.That(() => batch.TryAdd(message), Is.True, "A message was rejected by the batch; all messages should be accepted."); + } + await sender.SendBatchAsync(batch); var options = new ServiceBusReceiverOptions { RetryOptions = new ServiceBusRetryOptions @@ -120,7 +132,13 @@ public async Task PeekRange_IncrementsSequenceNumber(int messageCt, int peekCt) var sessionId = Guid.NewGuid().ToString(); // send the messages IEnumerable sentMessages = GetMessages(messageCt, sessionId); - await sender.SendBatchAsync(sentMessages); + using ServiceBusMessageBatch batch = await sender.CreateBatchAsync(); + + foreach (ServiceBusMessage message in sentMessages) + { + Assert.That(() => batch.TryAdd(message), Is.True, "A message was rejected by the batch; all messages should be accepted."); + } + await sender.SendBatchAsync(batch); ServiceBusReceiver receiver = await client.GetSessionReceiverAsync(scope.QueueName); long seq = 0; @@ -152,7 +170,13 @@ public async Task Peek_IncrementsSequenceNmber(int messageCt) var sessionId = Guid.NewGuid().ToString(); // send the messages IEnumerable sentMessages = GetMessages(messageCt, sessionId); - await sender.SendBatchAsync(sentMessages); + using ServiceBusMessageBatch batch = await sender.CreateBatchAsync(); + + foreach (ServiceBusMessage message in sentMessages) + { + Assert.That(() => batch.TryAdd(message), Is.True, "A message was rejected by the batch; all messages should be accepted."); + } + await sender.SendBatchAsync(batch); ServiceBusReceiver receiver = await client.GetSessionReceiverAsync(scope.QueueName); @@ -180,11 +204,17 @@ public async Task RoundRobinSessions() var messageCt = 10; HashSet sessions = new HashSet() { "1", "2", "3" }; - + using ServiceBusMessageBatch batch = await sender.CreateBatchAsync(); // send the messages foreach (string session in sessions) { - await sender.SendBatchAsync(GetMessages(messageCt, session)); + var sentMessages = GetMessages(messageCt, session); + + foreach (ServiceBusMessage message in sentMessages) + { + Assert.That(() => batch.TryAdd(message), Is.True, "A message was rejected by the batch; all messages should be accepted."); + } + await sender.SendBatchAsync(batch); } // create receiver not scoped to a specific session @@ -217,7 +247,13 @@ public async Task ReceiveMessagesInPeekLockMode() var messageCount = 10; var sessionId = "sessionId1"; IEnumerable messages = GetMessages(messageCount, sessionId); - await sender.SendBatchAsync(messages); + using ServiceBusMessageBatch batch = await sender.CreateBatchAsync(); + + foreach (ServiceBusMessage message in messages) + { + Assert.That(() => batch.TryAdd(message), Is.True, "A message was rejected by the batch; all messages should be accepted."); + } + await sender.SendBatchAsync(batch); ServiceBusReceiver receiver = await client.GetSessionReceiverAsync(scope.QueueName); @@ -255,7 +291,13 @@ public async Task ReceiveMessagesInReceiveAndDeleteMode() var messageCount = 10; var sessionId = "sessionId1"; IEnumerable messages = GetMessages(messageCount, sessionId); - await sender.SendBatchAsync(messages); + using ServiceBusMessageBatch batch = await sender.CreateBatchAsync(); + + foreach (ServiceBusMessage message in messages) + { + Assert.That(() => batch.TryAdd(message), Is.True, "A message was rejected by the batch; all messages should be accepted."); + } + await sender.SendBatchAsync(batch); var clientOptions = new ServiceBusReceiverOptions() { @@ -279,8 +321,8 @@ public async Task ReceiveMessagesInReceiveAndDeleteMode() } Assert.AreEqual(messageCount, receivedMessageCount); - var message = receiver.PeekAsync(); - Assert.IsNull(message.Result); + var peekedMessage = receiver.PeekAsync(); + Assert.IsNull(peekedMessage.Result); } } @@ -297,7 +339,13 @@ public async Task CompleteMessages(bool useSpecificSession) var messageCount = 10; var sessionId = "sessionId1"; IEnumerable messages = GetMessages(messageCount, sessionId); - await sender.SendBatchAsync(messages); + using ServiceBusMessageBatch batch = await sender.CreateBatchAsync(); + + foreach (ServiceBusMessage message in messages) + { + Assert.That(() => batch.TryAdd(message), Is.True, "A message was rejected by the batch; all messages should be accepted."); + } + await sender.SendBatchAsync(batch); ServiceBusReceiver receiver = await client.GetSessionReceiverAsync( scope.QueueName, @@ -315,8 +363,8 @@ public async Task CompleteMessages(bool useSpecificSession) } Assert.AreEqual(messageCount, receivedMessageCount); - var message = receiver.PeekAsync(); - Assert.IsNull(message.Result); + var peekedMessage = receiver.PeekAsync(); + Assert.IsNull(peekedMessage.Result); } } @@ -333,7 +381,13 @@ public async Task AbandonMessages(bool useSpecificSession) var messageCount = 10; var sessionId = "sessionId1"; IEnumerable messages = GetMessages(messageCount, sessionId); - await sender.SendBatchAsync(messages); + using ServiceBusMessageBatch batch = await sender.CreateBatchAsync(); + + foreach (ServiceBusMessage message in messages) + { + Assert.That(() => batch.TryAdd(message), Is.True, "A message was rejected by the batch; all messages should be accepted."); + } + await sender.SendBatchAsync(batch); ServiceBusReceiver receiver = await client.GetSessionReceiverAsync( scope.QueueName, @@ -377,7 +431,13 @@ public async Task DeadLetterMessages(bool useSpecificSession) var messageCount = 10; var sessionId = "sessionId1"; IEnumerable messages = GetMessages(messageCount, sessionId); - await sender.SendBatchAsync(messages); + using ServiceBusMessageBatch batch = await sender.CreateBatchAsync(); + + foreach (ServiceBusMessage message in messages) + { + Assert.That(() => batch.TryAdd(message), Is.True, "A message was rejected by the batch; all messages should be accepted."); + } + await sender.SendBatchAsync(batch); var receiver = await client.GetSessionReceiverAsync( scope.QueueName, @@ -395,8 +455,8 @@ public async Task DeadLetterMessages(bool useSpecificSession) } Assert.AreEqual(messageCount, receivedMessageCount); - var message = receiver.PeekAsync(); - Assert.IsNull(message.Result); + var peekedMessage = receiver.PeekAsync(); + Assert.IsNull(peekedMessage.Result); // TODO: System.InvalidOperationException : Cannot create a MessageSession for a sub-queue. @@ -433,7 +493,13 @@ public async Task DeferMessages(bool useSpecificSession) var messageCount = 10; var sessionId = "sessionId1"; IEnumerable messages = GetMessages(messageCount, sessionId); - await sender.SendBatchAsync(messages); + using ServiceBusMessageBatch batch = await sender.CreateBatchAsync(); + + foreach (ServiceBusMessage message in messages) + { + Assert.That(() => batch.TryAdd(message), Is.True, "A message was rejected by the batch; all messages should be accepted."); + } + await sender.SendBatchAsync(batch); var receiver = await client.GetSessionReceiverAsync( scope.QueueName, @@ -473,11 +539,11 @@ public async Task RenewSessionLock(bool isSessionSpecified) ServiceBusSender sender = client.GetSender(scope.QueueName); var messageCount = 1; var sessionId1 = "sessionId1"; - IEnumerable messages = GetMessages(messageCount, sessionId1); + ServiceBusMessage message = GetMessage(sessionId1); // send another session message before the one we are interested in to make sure that when isSessionSpecified=true, it is being respected - await sender.SendBatchAsync(GetMessages(messageCount, "sessionId2")); - await sender.SendBatchAsync(messages); + await sender.SendAsync(GetMessage("sessionId2")); + await sender.SendAsync(message); ServiceBusReceiver receiver = await client.GetSessionReceiverAsync(scope.QueueName, sessionId: isSessionSpecified ? sessionId1 : null); if (isSessionSpecified) @@ -486,7 +552,7 @@ public async Task RenewSessionLock(bool isSessionSpecified) } ServiceBusReceivedMessage[] receivedMessages = (await receiver.ReceiveBatchAsync(messageCount)).ToArray(); - var message = receivedMessages.First(); + var receivedMessage = receivedMessages.First(); var firstLockedUntilUtcTime = receiver.SessionManager.LockedUntilUtc; // Sleeping for 10 seconds... @@ -497,15 +563,12 @@ public async Task RenewSessionLock(bool isSessionSpecified) Assert.True(receiver.SessionManager.LockedUntilUtc >= firstLockedUntilUtcTime + TimeSpan.FromSeconds(10)); // Complete Messages - await receiver.CompleteAsync(message); - - var messageEnum = messages.GetEnumerator(); - messageEnum.MoveNext(); + await receiver.CompleteAsync(receivedMessage); Assert.AreEqual(messageCount, receivedMessages.Length); if (isSessionSpecified) { - Assert.AreEqual(messageEnum.Current.MessageId, message.MessageId); + Assert.AreEqual(message.MessageId, receivedMessage.MessageId); } var peekedMessage = receiver.PeekAsync(); From a1fd2187215cc0403665c02e9faabf7c42cadbaa Mon Sep 17 00:00:00 2001 From: Shivangi Reja Date: Thu, 5 Mar 2020 14:03:30 -0800 Subject: [PATCH 2/3] Address PR comments --- .../src/Amqp/AmqpMessageConverter.cs | 53 ++++++------ .../src/Amqp/AmqpSender.cs | 39 ++++----- .../src/Core/TransportSender.cs | 8 -- .../src/Sender/ServiceBusSender.cs | 8 +- .../tests/ProcessorLiveTests.cs | 20 +---- .../tests/ReceiverLiveTests.cs | 64 +++++--------- .../tests/SenderLiveTests.cs | 26 ++++-- .../tests/ServiceBusTestBase.cs | 10 +++ .../tests/SessionReceiverLiveTests.cs | 83 ++++--------------- 9 files changed, 117 insertions(+), 194 deletions(-) diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageConverter.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageConverter.cs index 7f8fa00f41760..6e0e54da24d62 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageConverter.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageConverter.cs @@ -56,7 +56,7 @@ private static AmqpMessage BuildAmqpBatchFromMessage(IEnumerable sour AmqpMessage firstAmqpMessage = null; SBMessage firstMessage = null; - AmqpMessage amqpMessage = BuildAmqpBatchFromMessages( + return BuildAmqpBatchFromMessages( source.Select(sbMessage => { if (firstAmqpMessage == null) @@ -69,30 +69,7 @@ private static AmqpMessage BuildAmqpBatchFromMessage(IEnumerable sour { return SBMessageToAmqpMessage(sbMessage); } - })); - - if (firstMessage != null && firstMessage.MessageId != null) - { - amqpMessage.Properties.MessageId = firstMessage.MessageId; - } - if (firstMessage != null && firstMessage.SessionId != null) - { - amqpMessage.Properties.GroupId = firstMessage.SessionId; - } - - if (firstMessage != null && firstMessage.PartitionKey != null) - { - amqpMessage.MessageAnnotations.Map[AmqpMessageConverter.PartitionKeyName] = - firstMessage.PartitionKey; - } - - if (firstMessage != null && firstMessage.ViaPartitionKey != null) - { - amqpMessage.MessageAnnotations.Map[AmqpMessageConverter.ViaPartitionKeyName] = - firstMessage.ViaPartitionKey; - } - - return amqpMessage; + }), firstMessage); } /// @@ -100,10 +77,13 @@ private static AmqpMessage BuildAmqpBatchFromMessage(IEnumerable sour /// /// /// The set of messages to use as the body of the batch message. + /// /// /// The batch containing the source messages. /// - private static AmqpMessage BuildAmqpBatchFromMessages(IEnumerable source) + private static AmqpMessage BuildAmqpBatchFromMessages( + IEnumerable source, + SBMessage firstMessage = null) { AmqpMessage batchEnvelope; @@ -125,6 +105,27 @@ private static AmqpMessage BuildAmqpBatchFromMessages(IEnumerable s batchEnvelope.MessageFormat = AmqpConstants.AmqpBatchedMessageFormat; } + if (firstMessage?.MessageId != null) + { + batchEnvelope.Properties.MessageId = firstMessage.MessageId; + } + if (firstMessage?.SessionId != null) + { + batchEnvelope.Properties.GroupId = firstMessage.SessionId; + } + + if (firstMessage?.PartitionKey != null) + { + batchEnvelope.MessageAnnotations.Map[AmqpMessageConverter.PartitionKeyName] = + firstMessage.PartitionKey; + } + + if (firstMessage?.ViaPartitionKey != null) + { + batchEnvelope.MessageAnnotations.Map[AmqpMessageConverter.ViaPartitionKeyName] = + firstMessage.ViaPartitionKey; + } + batchEnvelope.Batchable = true; return batchEnvelope; } diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpSender.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpSender.cs index cd5a0a64ebe70..0b1d685f0b95b 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpSender.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpSender.cs @@ -144,7 +144,7 @@ public override async ValueTask CreateBatchAsync( TransportMessageBatch messageBatch = null; Task createBatchTask = _retryPolicy.RunOperation(async (timeout) => { - messageBatch = await CreateBatchInternal( + messageBatch = await CreateBatchInternalAsync( options, timeout).ConfigureAwait(false); }, @@ -155,7 +155,7 @@ public override async ValueTask CreateBatchAsync( return messageBatch; } - internal async ValueTask CreateBatchInternal( + internal async ValueTask CreateBatchInternalAsync( CreateBatchOptions options, TimeSpan timeout) { @@ -183,21 +183,19 @@ internal async ValueTask CreateBatchInternal( /// /// /// The set of messages to send. - /// /// An optional instance to signal the request to cancel the operation. /// /// A task to be resolved on when the operation has completed. /// public override async Task SendBatchAsync( ServiceBusMessageBatch messageBatch, - ServiceBusRetryPolicy retryPolicy, CancellationToken cancellationToken) { Argument.AssertNotNull(messageBatch, nameof(messageBatch)); Argument.AssertNotClosed(_closed, nameof(AmqpSender)); - await retryPolicy.RunOperation(async (timeout) => - await SendBatchInternal( + await _retryPolicy.RunOperation(async (timeout) => + await SendBatchInternalAsync( messageBatch, timeout, cancellationToken).ConfigureAwait(false), @@ -214,7 +212,7 @@ await SendBatchInternal( /// /// An optional instance to signal the request to cancel the operation. /// - internal virtual async Task SendBatchInternal( + internal virtual async Task SendBatchInternalAsync( ServiceBusMessageBatch messageBatch, TimeSpan timeout, CancellationToken cancellationToken) @@ -262,19 +260,17 @@ internal virtual async Task SendBatchInternal( /// /// /// A message to send. - /// /// An optional instance to signal the request to cancel the operation. /// public override async Task SendAsync( ServiceBusMessage message, - ServiceBusRetryPolicy retryPolicy, CancellationToken cancellationToken) { Argument.AssertNotNull(message, nameof(message)); Argument.AssertNotClosed(_closed, nameof(AmqpSender)); - await retryPolicy.RunOperation(async (timeout) => - await SendInternal( + await _retryPolicy.RunOperation(async (timeout) => + await SendInternalAsync( message, timeout, cancellationToken).ConfigureAwait(false), @@ -291,7 +287,7 @@ await SendInternal( /// /// An optional instance to signal the request to cancel the operation. /// - internal virtual async Task SendInternal( + internal virtual async Task SendInternalAsync( ServiceBusMessage message, TimeSpan timeout, CancellationToken cancellationToken) @@ -379,18 +375,16 @@ public override async Task CloseAsync(CancellationToken cancellationToken) /// /// /// - /// /// /// public override async Task ScheduleMessageAsync( ServiceBusMessage message, - ServiceBusRetryPolicy retryPolicy, CancellationToken cancellationToken = default) { long sequenceNumber = 0; - Task scheduleTask = retryPolicy.RunOperation(async (timeout) => + Task scheduleTask = _retryPolicy.RunOperation(async (timeout) => { - sequenceNumber = await ScheduleMessageInternal( + sequenceNumber = await ScheduleMessageInternalAsync( message, timeout, cancellationToken).ConfigureAwait(false); @@ -409,7 +403,7 @@ public override async Task ScheduleMessageAsync( /// /// /// - internal async Task ScheduleMessageInternal( + internal async Task ScheduleMessageInternalAsync( ServiceBusMessage message, TimeSpan timeout, CancellationToken cancellationToken = default) @@ -494,19 +488,16 @@ internal async Task ScheduleMessageInternal( /// /// /// - /// /// /// public override async Task CancelScheduledMessageAsync( long sequenceNumber, - ServiceBusRetryPolicy retryPolicy, CancellationToken cancellationToken = default) { - Task cancelMessageTask = retryPolicy.RunOperation(async (timeout) => + Task cancelMessageTask = _retryPolicy.RunOperation(async (timeout) => { - await CancelScheduledMessageInternal( + await CancelScheduledMessageInternalAsync( sequenceNumber, - retryPolicy, timeout, cancellationToken).ConfigureAwait(false); }, @@ -520,13 +511,11 @@ await CancelScheduledMessageInternal( /// /// /// - /// /// /// /// - internal async Task CancelScheduledMessageInternal( + internal async Task CancelScheduledMessageInternalAsync( long sequenceNumber, - ServiceBusRetryPolicy retryPolicy, TimeSpan timeout, CancellationToken cancellationToken = default) { diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Core/TransportSender.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Core/TransportSender.cs index 09f9cf8db0ba9..acac87540f5f1 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Core/TransportSender.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Core/TransportSender.cs @@ -51,12 +51,10 @@ public abstract ValueTask CreateBatchAsync( /// /// /// A message to send. - /// /// An optional instance to signal the request to cancel the operation. /// public abstract Task SendAsync( ServiceBusMessage message, - ServiceBusRetryPolicy retryPolicy, CancellationToken cancellationToken); /// @@ -64,38 +62,32 @@ public abstract Task SendAsync( /// /// /// The set of messages to send. - /// /// An optional instance to signal the request to cancel the operation. /// /// A task to be resolved on when the operation has completed. /// public abstract Task SendBatchAsync( ServiceBusMessageBatch messageBatch, - ServiceBusRetryPolicy retryPolicy, CancellationToken cancellationToken); /// /// /// /// - /// /// /// public abstract Task ScheduleMessageAsync( ServiceBusMessage message, - ServiceBusRetryPolicy retryPolicy, CancellationToken cancellationToken = default); /// /// /// /// - /// /// /// public abstract Task CancelScheduledMessageAsync( long sequenceNumber, - ServiceBusRetryPolicy retryPolicy, CancellationToken cancellationToken = default); /// diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/ServiceBusSender.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/ServiceBusSender.cs index 8db1ad6279333..89d98e25c366b 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/ServiceBusSender.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/ServiceBusSender.cs @@ -136,7 +136,7 @@ public virtual async Task SendAsync( CancellationToken cancellationToken = default) { Argument.AssertNotNull(message, nameof(message)); - await _innerSender.SendAsync(message, _retryPolicy, cancellationToken).ConfigureAwait(false); + await _innerSender.SendAsync(message, cancellationToken).ConfigureAwait(false); } /// @@ -197,7 +197,7 @@ public virtual async Task SendBatchAsync( CancellationToken cancellationToken = default) { Argument.AssertNotNull(messageBatch, nameof(messageBatch)); - await _innerSender.SendBatchAsync(messageBatch, _retryPolicy, cancellationToken).ConfigureAwait(false); + await _innerSender.SendBatchAsync(messageBatch, cancellationToken).ConfigureAwait(false); } /// @@ -215,7 +215,7 @@ public virtual async Task ScheduleMessageAsync( //this.ThrowIfClosed(); Argument.AssertNotNull(message, nameof(message)); message.ScheduledEnqueueTimeUtc = scheduleEnqueueTimeUtc.UtcDateTime; - return await _innerSender.ScheduleMessageAsync(message, _retryPolicy, cancellationToken).ConfigureAwait(false); + return await _innerSender.ScheduleMessageAsync(message, cancellationToken).ConfigureAwait(false); } /// @@ -226,7 +226,7 @@ public virtual async Task ScheduleMessageAsync( public virtual async Task CancelScheduledMessageAsync(long sequenceNumber, CancellationToken cancellationToken = default) { //this.ThrowIfClosed(); - await _innerSender.CancelScheduledMessageAsync(sequenceNumber, _retryPolicy, cancellationToken).ConfigureAwait(false); + await _innerSender.CancelScheduledMessageAsync(sequenceNumber, cancellationToken).ConfigureAwait(false); } /// diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/ProcessorLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/ProcessorLiveTests.cs index e98fa9febe8a1..f67840fd090b8 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/ProcessorLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/ProcessorLiveTests.cs @@ -1,11 +1,7 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. -using System; -using System.Collections.Generic; using System.Linq; -using System.Linq.Expressions; -using System.Text; using System.Threading; using System.Threading.Tasks; using NUnit.Framework; @@ -30,14 +26,10 @@ public async Task Receive_Event(int numThreads) // use double the number of threads so we can make sure we test that we don't // retrieve more messages than expected when there are more messages available - IEnumerable messages = GetMessages(numThreads * 2); using ServiceBusMessageBatch batch = await sender.CreateBatchAsync(); + ServiceBusMessageBatch messageBatch = AddMessages(batch, numThreads * 2); - foreach (ServiceBusMessage message in messages) - { - Assert.That(() => batch.TryAdd(message), Is.True, "A message was rejected by the batch; all messages should be accepted."); - } - await sender.SendBatchAsync(batch); + await sender.SendBatchAsync(messageBatch); await using var processor = client.GetProcessor(scope.QueueName); int messageCt = 0; @@ -91,14 +83,10 @@ public async Task Receive_StopProcessing(int numThreads) await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString); ServiceBusSender sender = client.GetSender(scope.QueueName); int numMessages = 50; - IEnumerable messages = GetMessages(numMessages); using ServiceBusMessageBatch batch = await sender.CreateBatchAsync(); + ServiceBusMessageBatch messageBatch = AddMessages(batch, numMessages); - foreach (ServiceBusMessage message in messages) - { - Assert.That(() => batch.TryAdd(message), Is.True, "A message was rejected by the batch; all messages should be accepted."); - } - await sender.SendBatchAsync(batch); + await sender.SendBatchAsync(messageBatch); await using var processor = client.GetProcessor(scope.QueueName); int messageProcessedCt = 0; diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/ReceiverLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/ReceiverLiveTests.cs index e3a91ffc1608a..bd1b5089be92c 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/ReceiverLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/ReceiverLiveTests.cs @@ -3,10 +3,8 @@ using System; using System.Collections.Generic; -using System.Diagnostics; using System.Linq; using System.Text; -using System.Threading; using System.Threading.Tasks; using NUnit.Framework; @@ -20,16 +18,12 @@ public async Task Peek() await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false)) { await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString); - ServiceBusSender sender = client.GetSender(scope.QueueName); var messageCt = 10; - IEnumerable sentMessages = GetMessages(messageCt); + ServiceBusSender sender = client.GetSender(scope.QueueName); using ServiceBusMessageBatch batch = await sender.CreateBatchAsync(); + IEnumerable sentMessages = AddMessages(batch, messageCt).AsEnumerable(); - foreach (ServiceBusMessage message in sentMessages) - { - Assert.That(() => batch.TryAdd(message), Is.True, "A message was rejected by the batch; all messages should be accepted."); - } await sender.SendBatchAsync(batch); await using var receiver = client.GetReceiver(scope.QueueName); @@ -57,15 +51,12 @@ public async Task ReceiveMessagesInPeekLockMode() await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false)) { await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString); - ServiceBusSender sender = client.GetSender(scope.QueueName); var messageCount = 10; - IEnumerable messages = GetMessages(messageCount); + + ServiceBusSender sender = client.GetSender(scope.QueueName); using ServiceBusMessageBatch batch = await sender.CreateBatchAsync(); + IEnumerable messages = AddMessages(batch, messageCount).AsEnumerable(); - foreach (ServiceBusMessage message in messages) - { - Assert.That(() => batch.TryAdd(message), Is.True, "A message was rejected by the batch; all messages should be accepted."); - } await sender.SendBatchAsync(batch); var receiver = client.GetReceiver(scope.QueueName); @@ -96,15 +87,12 @@ public async Task CompleteMessages() await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false)) { await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString); - ServiceBusSender sender = client.GetSender(scope.QueueName); var messageCount = 10; - IEnumerable messages = GetMessages(messageCount); + + ServiceBusSender sender = client.GetSender(scope.QueueName); using ServiceBusMessageBatch batch = await sender.CreateBatchAsync(); + IEnumerable messages = AddMessages(batch, messageCount).AsEnumerable(); - foreach (ServiceBusMessage message in messages) - { - Assert.That(() => batch.TryAdd(message), Is.True, "A message was rejected by the batch; all messages should be accepted."); - } await sender.SendBatchAsync(batch); var receiver = client.GetReceiver(scope.QueueName); @@ -131,15 +119,12 @@ public async Task AbandonMessages() await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false)) { await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString); - ServiceBusSender sender = client.GetSender(scope.QueueName); var messageCount = 10; - IEnumerable messages = GetMessages(messageCount); + + ServiceBusSender sender = client.GetSender(scope.QueueName); using ServiceBusMessageBatch batch = await sender.CreateBatchAsync(); + IEnumerable messages = AddMessages(batch, messageCount).AsEnumerable(); - foreach (ServiceBusMessage message in messages) - { - Assert.That(() => batch.TryAdd(message), Is.True, "A message was rejected by the batch; all messages should be accepted."); - } await sender.SendBatchAsync(batch); var receiver = client.GetReceiver(scope.QueueName); @@ -174,15 +159,12 @@ public async Task DeadLetterMessages() await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false)) { await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString); - ServiceBusSender sender = client.GetSender(scope.QueueName); var messageCount = 10; - IEnumerable messages = GetMessages(messageCount); + + ServiceBusSender sender = client.GetSender(scope.QueueName); using ServiceBusMessageBatch batch = await sender.CreateBatchAsync(); + IEnumerable messages = AddMessages(batch, messageCount).AsEnumerable(); - foreach (ServiceBusMessage message in messages) - { - Assert.That(() => batch.TryAdd(message), Is.True, "A message was rejected by the batch; all messages should be accepted."); - } await sender.SendBatchAsync(batch); var receiver = client.GetReceiver(scope.QueueName); @@ -226,15 +208,12 @@ public async Task DeferMessages() await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false)) { await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString); - ServiceBusSender sender = client.GetSender(scope.QueueName); var messageCount = 10; - IEnumerable messages = GetMessages(messageCount); + + ServiceBusSender sender = client.GetSender(scope.QueueName); using ServiceBusMessageBatch batch = await sender.CreateBatchAsync(); + IEnumerable messages = AddMessages(batch, messageCount).AsEnumerable(); - foreach (ServiceBusMessage message in messages) - { - Assert.That(() => batch.TryAdd(message), Is.True, "A message was rejected by the batch; all messages should be accepted."); - } await sender.SendBatchAsync(batch); var receiver = client.GetReceiver(scope.QueueName); @@ -269,15 +248,12 @@ public async Task ReceiveMessagesInReceiveAndDeleteMode() await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false)) { await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString); - ServiceBusSender sender = client.GetSender(scope.QueueName); var messageCount = 10; - IEnumerable messages = GetMessages(messageCount); + + ServiceBusSender sender = client.GetSender(scope.QueueName); using ServiceBusMessageBatch batch = await sender.CreateBatchAsync(); + IEnumerable messages = AddMessages(batch, messageCount).AsEnumerable(); - foreach (ServiceBusMessage message in messages) - { - Assert.That(() => batch.TryAdd(message), Is.True, "A message was rejected by the batch; all messages should be accepted."); - } await sender.SendBatchAsync(batch); var clientOptions = new ServiceBusReceiverOptions() diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/SenderLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/SenderLiveTests.cs index 3963999c5e309..10ce18bbce3e1 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/SenderLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/SenderLiveTests.cs @@ -98,17 +98,14 @@ public async Task SenderCanSendAMessageBatch() await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString); ServiceBusSender sender = client.GetSender(scope.QueueName); using ServiceBusMessageBatch batch = await sender.CreateBatchAsync(); + ServiceBusMessageBatch messageBatch = AddMessages(batch, 3); - batch.TryAdd(new ServiceBusMessage(Encoding.UTF8.GetBytes("This is a message"))); - batch.TryAdd(new ServiceBusMessage(Encoding.UTF8.GetBytes("This is another message"))); - batch.TryAdd(new ServiceBusMessage(Encoding.UTF8.GetBytes("So many messages"))); - - await sender.SendBatchAsync(batch); + await sender.SendBatchAsync(messageBatch); } } [Test] - public async Task SenderCanSendZeroLengthMessageBatch() + public async Task SenderCanSendAnEmptyBodyMessageBatch() { await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false)) { @@ -155,6 +152,23 @@ public async Task SenderCannotSendLargerThanMaximumSize() } } + [Test] + public async Task TryAddReturnsFalseIfSizeExceed() + { + await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false)) + { + await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString); + ServiceBusSender sender = client.GetSender(scope.QueueName); + using ServiceBusMessageBatch batch = await sender.CreateBatchAsync(); + + // Actual limit is 262144 bytes for a single message. + Assert.That(() => batch.TryAdd(new ServiceBusMessage(new byte[200000])), Is.True, "A message was rejected by the batch; all messages should be accepted."); + Assert.That(() => batch.TryAdd(new ServiceBusMessage(new byte[200000])), Is.False, "A message was rejected by the batch; message size exceed."); + + await sender.SendBatchAsync(batch); + } + } + [Test] public async Task ClientProperties() { diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/ServiceBusTestBase.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/ServiceBusTestBase.cs index 37eeb417a10b0..47192a362edb3 100755 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/ServiceBusTestBase.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/ServiceBusTestBase.cs @@ -21,6 +21,16 @@ protected IEnumerable GetMessages(int count, string sessionId return messages; } + protected ServiceBusMessageBatch AddMessages(ServiceBusMessageBatch batch, int count, string sessionId = null, string partitionKey = null) + { + for (int i = 0; i < count; i++) + { + Assert.That(() => batch.TryAdd(GetMessage(sessionId, partitionKey)), Is.True, "A message was rejected by the batch; all messages should be accepted."); + } + + return batch; + } + protected Task ExceptionHandler(ProcessErrorEventArgs eventArgs) { Assert.Fail(eventArgs.Exception.ToString()); diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/SessionReceiverLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/SessionReceiverLiveTests.cs index 22e9cc76d6144..12af7bad2c8d2 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/SessionReceiverLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/SessionReceiverLiveTests.cs @@ -5,11 +5,9 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; -using System.Runtime.InteropServices.ComTypes; using System.Text; using System.Threading; using System.Threading.Tasks; -using Azure.Core; using NUnit.Framework; using NUnit.Framework.Internal; @@ -33,13 +31,9 @@ public async Task Peek_Session(long? sequenceNumber, string partitionKey) var sessionId = Guid.NewGuid().ToString(); // send the messages - IEnumerable sentMessages = GetMessages(messageCt, sessionId, partitionKey); using ServiceBusMessageBatch batch = await sender.CreateBatchAsync(); + IEnumerable sentMessages = AddMessages(batch, messageCt, sessionId, partitionKey).AsEnumerable(); - foreach (ServiceBusMessage message in sentMessages) - { - Assert.That(() => batch.TryAdd(message), Is.True, "A message was rejected by the batch; all messages should be accepted."); - } await sender.SendBatchAsync(batch); Dictionary sentMessageIdToMsg = new Dictionary(); foreach (ServiceBusMessage message in sentMessages) @@ -87,14 +81,10 @@ public async Task Lock_Same_Session_Should_Throw() int messageCt = 10; var sessionId = Guid.NewGuid().ToString(); // send the messages - IEnumerable sentMessages = GetMessages(messageCt, sessionId); using ServiceBusMessageBatch batch = await sender.CreateBatchAsync(); + ServiceBusMessageBatch messageBatch = AddMessages(batch, messageCt, sessionId); - foreach (ServiceBusMessage message in sentMessages) - { - Assert.That(() => batch.TryAdd(message), Is.True, "A message was rejected by the batch; all messages should be accepted."); - } - await sender.SendBatchAsync(batch); + await sender.SendBatchAsync(messageBatch); var options = new ServiceBusReceiverOptions { RetryOptions = new ServiceBusRetryOptions @@ -131,14 +121,10 @@ public async Task PeekRange_IncrementsSequenceNumber(int messageCt, int peekCt) var sessionId = Guid.NewGuid().ToString(); // send the messages - IEnumerable sentMessages = GetMessages(messageCt, sessionId); using ServiceBusMessageBatch batch = await sender.CreateBatchAsync(); + ServiceBusMessageBatch messagebatch = AddMessages(batch, messageCt, sessionId); - foreach (ServiceBusMessage message in sentMessages) - { - Assert.That(() => batch.TryAdd(message), Is.True, "A message was rejected by the batch; all messages should be accepted."); - } - await sender.SendBatchAsync(batch); + await sender.SendBatchAsync(messagebatch); ServiceBusReceiver receiver = await client.GetSessionReceiverAsync(scope.QueueName); long seq = 0; @@ -169,14 +155,10 @@ public async Task Peek_IncrementsSequenceNmber(int messageCt) ServiceBusSender sender = client.GetSender(scope.QueueName); var sessionId = Guid.NewGuid().ToString(); // send the messages - IEnumerable sentMessages = GetMessages(messageCt, sessionId); using ServiceBusMessageBatch batch = await sender.CreateBatchAsync(); + ServiceBusMessageBatch messagebatch = AddMessages(batch, messageCt, sessionId); - foreach (ServiceBusMessage message in sentMessages) - { - Assert.That(() => batch.TryAdd(message), Is.True, "A message was rejected by the batch; all messages should be accepted."); - } - await sender.SendBatchAsync(batch); + await sender.SendBatchAsync(messagebatch); ServiceBusReceiver receiver = await client.GetSessionReceiverAsync(scope.QueueName); @@ -204,17 +186,12 @@ public async Task RoundRobinSessions() var messageCt = 10; HashSet sessions = new HashSet() { "1", "2", "3" }; - using ServiceBusMessageBatch batch = await sender.CreateBatchAsync(); // send the messages foreach (string session in sessions) { - var sentMessages = GetMessages(messageCt, session); - - foreach (ServiceBusMessage message in sentMessages) - { - Assert.That(() => batch.TryAdd(message), Is.True, "A message was rejected by the batch; all messages should be accepted."); - } - await sender.SendBatchAsync(batch); + using ServiceBusMessageBatch batch = await sender.CreateBatchAsync(); + ServiceBusMessageBatch messageBatch = AddMessages(batch, messageCt, session); + await sender.SendBatchAsync(messageBatch); } // create receiver not scoped to a specific session @@ -226,7 +203,7 @@ public async Task RoundRobinSessions() fromSequenceNumber: 1, maxMessages: 10)) { - var sessionId = receiver.SessionManager.SessionId; + var sessionId = receiver.SessionManager.SessionId; Assert.AreEqual(sessionId, peekedMessage.SessionId); } @@ -246,13 +223,9 @@ public async Task ReceiveMessagesInPeekLockMode() var messageCount = 10; var sessionId = "sessionId1"; - IEnumerable messages = GetMessages(messageCount, sessionId); using ServiceBusMessageBatch batch = await sender.CreateBatchAsync(); + IEnumerable messages = AddMessages(batch, messageCount, sessionId).AsEnumerable(); - foreach (ServiceBusMessage message in messages) - { - Assert.That(() => batch.TryAdd(message), Is.True, "A message was rejected by the batch; all messages should be accepted."); - } await sender.SendBatchAsync(batch); ServiceBusReceiver receiver = await client.GetSessionReceiverAsync(scope.QueueName); @@ -290,13 +263,9 @@ public async Task ReceiveMessagesInReceiveAndDeleteMode() var messageCount = 10; var sessionId = "sessionId1"; - IEnumerable messages = GetMessages(messageCount, sessionId); using ServiceBusMessageBatch batch = await sender.CreateBatchAsync(); + IEnumerable messages = AddMessages(batch, messageCount, sessionId).AsEnumerable(); - foreach (ServiceBusMessage message in messages) - { - Assert.That(() => batch.TryAdd(message), Is.True, "A message was rejected by the batch; all messages should be accepted."); - } await sender.SendBatchAsync(batch); var clientOptions = new ServiceBusReceiverOptions() @@ -338,13 +307,9 @@ public async Task CompleteMessages(bool useSpecificSession) var messageCount = 10; var sessionId = "sessionId1"; - IEnumerable messages = GetMessages(messageCount, sessionId); using ServiceBusMessageBatch batch = await sender.CreateBatchAsync(); + IEnumerable messages = AddMessages(batch, messageCount, sessionId).AsEnumerable(); - foreach (ServiceBusMessage message in messages) - { - Assert.That(() => batch.TryAdd(message), Is.True, "A message was rejected by the batch; all messages should be accepted."); - } await sender.SendBatchAsync(batch); ServiceBusReceiver receiver = await client.GetSessionReceiverAsync( @@ -380,13 +345,9 @@ public async Task AbandonMessages(bool useSpecificSession) var messageCount = 10; var sessionId = "sessionId1"; - IEnumerable messages = GetMessages(messageCount, sessionId); using ServiceBusMessageBatch batch = await sender.CreateBatchAsync(); + IEnumerable messages = AddMessages(batch, messageCount, sessionId).AsEnumerable(); - foreach (ServiceBusMessage message in messages) - { - Assert.That(() => batch.TryAdd(message), Is.True, "A message was rejected by the batch; all messages should be accepted."); - } await sender.SendBatchAsync(batch); ServiceBusReceiver receiver = await client.GetSessionReceiverAsync( @@ -430,13 +391,9 @@ public async Task DeadLetterMessages(bool useSpecificSession) ServiceBusSender sender = client.GetSender(scope.QueueName); var messageCount = 10; var sessionId = "sessionId1"; - IEnumerable messages = GetMessages(messageCount, sessionId); using ServiceBusMessageBatch batch = await sender.CreateBatchAsync(); + IEnumerable messages = AddMessages(batch, messageCount, sessionId).AsEnumerable(); - foreach (ServiceBusMessage message in messages) - { - Assert.That(() => batch.TryAdd(message), Is.True, "A message was rejected by the batch; all messages should be accepted."); - } await sender.SendBatchAsync(batch); var receiver = await client.GetSessionReceiverAsync( @@ -492,13 +449,9 @@ public async Task DeferMessages(bool useSpecificSession) var messageCount = 10; var sessionId = "sessionId1"; - IEnumerable messages = GetMessages(messageCount, sessionId); using ServiceBusMessageBatch batch = await sender.CreateBatchAsync(); + IEnumerable messages = AddMessages(batch, messageCount, sessionId).AsEnumerable(); - foreach (ServiceBusMessage message in messages) - { - Assert.That(() => batch.TryAdd(message), Is.True, "A message was rejected by the batch; all messages should be accepted."); - } await sender.SendBatchAsync(batch); var receiver = await client.GetSessionReceiverAsync( @@ -701,7 +654,7 @@ async Task ProcessMessage(ProcessMessageEventArgs args) var message = args.Message; await receiver.CompleteAsync(message); sessions.TryRemove(message.SessionId, out bool _); - Assert.AreEqual(message.SessionId, receiver.SessionManager.SessionId); + Assert.AreEqual(message.SessionId, receiver.SessionManager.SessionId); Assert.IsNotNull(receiver.SessionManager.LockedUntilUtc); } finally From 7bad75f701f047a3be21816d8d52cf70b818a120 Mon Sep 17 00:00:00 2001 From: Shivangi Reja Date: Thu, 5 Mar 2020 14:58:09 -0800 Subject: [PATCH 3/3] Move the test files into the folders --- .../tests/{ => Infrastructure}/ServiceBusLiveTestBase.cs | 0 .../tests/{ => Infrastructure}/ServiceBusTestBase.cs | 0 .../tests/{ => Processor}/ProcessorLiveTests.cs | 0 .../tests/{ => Receiver}/ReceiverLiveTests.cs | 0 .../tests/{ => Receiver}/SessionReceiverLiveTests.cs | 0 .../tests/{ => Sender}/SenderLiveTests.cs | 0 .../Azure.Messaging.ServiceBus/tests/{ => Sender}/SenderTests.cs | 0 7 files changed, 0 insertions(+), 0 deletions(-) rename sdk/servicebus/Azure.Messaging.ServiceBus/tests/{ => Infrastructure}/ServiceBusLiveTestBase.cs (100%) rename sdk/servicebus/Azure.Messaging.ServiceBus/tests/{ => Infrastructure}/ServiceBusTestBase.cs (100%) mode change 100755 => 100644 rename sdk/servicebus/Azure.Messaging.ServiceBus/tests/{ => Processor}/ProcessorLiveTests.cs (100%) rename sdk/servicebus/Azure.Messaging.ServiceBus/tests/{ => Receiver}/ReceiverLiveTests.cs (100%) rename sdk/servicebus/Azure.Messaging.ServiceBus/tests/{ => Receiver}/SessionReceiverLiveTests.cs (100%) rename sdk/servicebus/Azure.Messaging.ServiceBus/tests/{ => Sender}/SenderLiveTests.cs (100%) rename sdk/servicebus/Azure.Messaging.ServiceBus/tests/{ => Sender}/SenderTests.cs (100%) diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/ServiceBusLiveTestBase.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Infrastructure/ServiceBusLiveTestBase.cs similarity index 100% rename from sdk/servicebus/Azure.Messaging.ServiceBus/tests/ServiceBusLiveTestBase.cs rename to sdk/servicebus/Azure.Messaging.ServiceBus/tests/Infrastructure/ServiceBusLiveTestBase.cs diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/ServiceBusTestBase.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Infrastructure/ServiceBusTestBase.cs old mode 100755 new mode 100644 similarity index 100% rename from sdk/servicebus/Azure.Messaging.ServiceBus/tests/ServiceBusTestBase.cs rename to sdk/servicebus/Azure.Messaging.ServiceBus/tests/Infrastructure/ServiceBusTestBase.cs diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/ProcessorLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs similarity index 100% rename from sdk/servicebus/Azure.Messaging.ServiceBus/tests/ProcessorLiveTests.cs rename to sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/ReceiverLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/ReceiverLiveTests.cs similarity index 100% rename from sdk/servicebus/Azure.Messaging.ServiceBus/tests/ReceiverLiveTests.cs rename to sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/ReceiverLiveTests.cs diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/SessionReceiverLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/SessionReceiverLiveTests.cs similarity index 100% rename from sdk/servicebus/Azure.Messaging.ServiceBus/tests/SessionReceiverLiveTests.cs rename to sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/SessionReceiverLiveTests.cs diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/SenderLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Sender/SenderLiveTests.cs similarity index 100% rename from sdk/servicebus/Azure.Messaging.ServiceBus/tests/SenderLiveTests.cs rename to sdk/servicebus/Azure.Messaging.ServiceBus/tests/Sender/SenderLiveTests.cs diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/SenderTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Sender/SenderTests.cs similarity index 100% rename from sdk/servicebus/Azure.Messaging.ServiceBus/tests/SenderTests.cs rename to sdk/servicebus/Azure.Messaging.ServiceBus/tests/Sender/SenderTests.cs