Skip to content

Commit

Permalink
breaking change that introduces OpenImpAsync to override and OpenAsyn…
Browse files Browse the repository at this point in the history
…c to initialize the semaphore.
  • Loading branch information
Loek committed Apr 19, 2019
1 parent 376470c commit 72243bb
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 168 deletions.
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
using System;
using Microsoft.Azure.ServiceBus;
using Microsoft.ServiceFabric.Services.Communication.Runtime;
using System;
using System.Collections.Generic;
using System.Fabric;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;
using Microsoft.ServiceFabric.Services.Communication.Runtime;

namespace ServiceFabric.ServiceBus.Services.Netstd.CommunicationListeners
{
Expand Down Expand Up @@ -146,7 +146,19 @@ protected ServiceBusCommunicationListener(ServiceContext context
/// A <see cref="T:System.Threading.Tasks.Task">Task</see> that represents outstanding operation. The result of the Task is
/// the endpoint string.
/// </returns>
public abstract Task<string> OpenAsync(CancellationToken cancellationToken);
public Task<string> OpenAsync(CancellationToken cancellationToken)
{
if (MaxConcurrentCalls.HasValue)
{
ProcessingMessage = new SemaphoreSlim(MaxConcurrentCalls.Value, MaxConcurrentCalls.Value);
}
else
{
ProcessingMessage = new SemaphoreSlim(1, 1);
}

return OpenImplAsync(cancellationToken);
}

/// <summary>
/// This method causes the communication listener to close. Close is a terminal state and
Expand Down Expand Up @@ -174,7 +186,7 @@ await Task.WhenAny(
{
// ReSharper disable once AccessToDisposedClosure
// ReSharper disable once EmptyGeneralCatchClause
try { ProcessingMessage.Wait(cancellationToken);}
try { ProcessingMessage.Wait(cancellationToken); }
catch { }
}
}, cancellationToken));
Expand All @@ -199,16 +211,20 @@ public virtual void Abort()
/// Starts listening for messages on the configured Service Bus Queue / Subscription
/// Make sure to call 'base' when overriding.
/// </summary>
protected virtual void ListenForMessages()
protected abstract void ListenForMessages();

/// <summary>
/// This method causes the communication listener to be opened. Once the Open
/// completes, the communication listener becomes usable - accepts and sends messages.
/// </summary>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>
/// A <see cref="T:System.Threading.Tasks.Task">Task</see> that represents outstanding operation. The result of the Task is
/// the endpoint string.
/// </returns>
protected virtual Task<string> OpenImplAsync(CancellationToken cancellationToken)
{
if (MaxConcurrentCalls.HasValue)
{
ProcessingMessage = new SemaphoreSlim(MaxConcurrentCalls.Value, MaxConcurrentCalls.Value);
}
else
{
ProcessingMessage = new SemaphoreSlim(1, 1);
}
return Task.FromResult("endpoint://");
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,150 +1,148 @@
using System;
using System.Fabric;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;
using Microsoft.ServiceFabric.Services.Communication.Runtime;
using Microsoft.ServiceFabric.Services.Runtime;

namespace ServiceFabric.ServiceBus.Services.Netstd.CommunicationListeners
{
/// <summary>
/// Implementation of <see cref="ICommunicationListener"/> that listens to a Service Bus Queue.
/// </summary>
public class ServiceBusQueueCommunicationListener : ServiceBusQueueCommunicationListenerBase
{
/// <summary>Gets or sets the maximum duration within which the lock will be renewed automatically. This
/// value should be greater than the longest message lock duration; for example, the LockDuration Property. </summary>
/// <value>The maximum duration during which locks are automatically renewed.</value>
public TimeSpan? AutoRenewTimeout { get; set; }

/// <summary>
/// Processor for messages.
/// </summary>
protected IServiceBusMessageReceiver Receiver { get; }

/// <summary>
/// Creates a new instance, using the init parameters of a <see cref="StatefulService"/>
/// </summary>
/// <param name="receiver">(Required) Processes incoming messages.</param>
/// <param name="context">(Optional) The context that was used to init the Reliable Service that uses this listener.</param>
/// <param name="serviceBusQueueName">(Optional) The name of the monitored Service Bus Queue (EntityPath in connectionstring is supported too)</param>
/// <param name="serviceBusSendConnectionString">(Optional) A Service Bus connection string that can be used for Sending messages.
/// (Returned as Service Endpoint.).
/// </param>
/// <param name="serviceBusReceiveConnectionString">(Required) A Service Bus connection string that can be used for Receiving messages.
/// </param>
public ServiceBusQueueCommunicationListener(IServiceBusMessageReceiver receiver,
ServiceContext context,
string serviceBusQueueName,
string serviceBusSendConnectionString,
string serviceBusReceiveConnectionString)
: base(context, serviceBusQueueName, serviceBusSendConnectionString, serviceBusReceiveConnectionString)
{
Receiver = receiver ?? throw new ArgumentNullException(nameof(receiver));
}

/// <summary>
/// Creates a new instance, using the init parameters of a <see cref="StatefulService"/>
/// </summary>
/// <param name="receiverFactory">(Required) Creates a handler that processes incoming messages.</param>
/// <param name="context">(Optional) The context that was used to init the Reliable Service that uses this listener.</param>
/// <param name="serviceBusQueueName">(Optional) The name of the monitored Service Bus Queue (EntityPath in connectionstring is supported too)</param>
/// <param name="serviceBusSendConnectionString">(Optional) A Service Bus connection string that can be used for Sending messages.
/// (Returned as Service Endpoint.).
/// </param>
/// <param name="serviceBusReceiveConnectionString">(Required) A Service Bus connection string that can be used for Receiving messages.
/// </param>
public ServiceBusQueueCommunicationListener(Func<IServiceBusCommunicationListener, IServiceBusMessageReceiver> receiverFactory,
ServiceContext context,
string serviceBusQueueName,
string serviceBusSendConnectionString,
string serviceBusReceiveConnectionString)
: base(context, serviceBusQueueName, serviceBusSendConnectionString, serviceBusReceiveConnectionString)
{
if (receiverFactory == null) throw new ArgumentNullException(nameof(receiverFactory));
var serviceBusMessageReceiver = receiverFactory(this);
Receiver = serviceBusMessageReceiver ?? throw new ArgumentException("Receiver factory cannot return null.", nameof(receiverFactory));
}

/// <summary>
/// Starts listening for messages on the configured Service Bus Queue.
/// </summary>
protected override void ListenForMessages()
{
base.ListenForMessages();

var options = new MessageHandlerOptions(ExceptionReceivedHandler);
if (AutoRenewTimeout.HasValue)
{
options.MaxAutoRenewDuration = AutoRenewTimeout.Value;
}
if (MaxConcurrentCalls.HasValue)
{
options.MaxConcurrentCalls = MaxConcurrentCalls.Value;
}

ServiceBusClient.RegisterMessageHandler(ReceiveMessageAsync, options);
}

/// <summary>
/// Logs the error and continues.
/// </summary>
/// <param name="args"></param>
/// <returns></returns>
protected virtual Task ExceptionReceivedHandler(ExceptionReceivedEventArgs args)
{
LogAction($"There was an error while receiving a message: {args.Exception.Message}");
return Task.CompletedTask;
}

/// <summary>
/// Will pass an incoming message to the <see cref="Receiver"/> for processing.
/// </summary>
/// <param name="message"></param>
/// <param name="cancellationToken"></param>
protected async Task ReceiveMessageAsync(Message message, CancellationToken cancellationToken)
{
try
{
if (IsClosing)
{
// We want the thread to sleep and not return immediately.
// Returning immediately could increment the message fail count and send it to dead letter.
Thread.Sleep(CloseTimeout);
return;
}

ProcessingMessage.Wait(cancellationToken);
var combined = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, StopProcessingMessageToken).Token;
await Receiver.ReceiveMessageAsync(message, combined);
if (Receiver.AutoComplete)
{
await ServiceBusClient.CompleteAsync(message.SystemProperties.LockToken);
}
}
finally
{
ProcessingMessage.Release();
}
}

protected override void Dispose(bool disposing)
{
if (!disposing) return;

if (MaxConcurrentCalls.HasValue)
{
ProcessingMessage.Release(MaxConcurrentCalls.Value);
}
using System;
using System.Fabric;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;
using Microsoft.ServiceFabric.Services.Communication.Runtime;
using Microsoft.ServiceFabric.Services.Runtime;

namespace ServiceFabric.ServiceBus.Services.Netstd.CommunicationListeners
{
/// <summary>
/// Implementation of <see cref="ICommunicationListener"/> that listens to a Service Bus Queue.
/// </summary>
public class ServiceBusQueueCommunicationListener : ServiceBusQueueCommunicationListenerBase
{
/// <summary>Gets or sets the maximum duration within which the lock will be renewed automatically. This
/// value should be greater than the longest message lock duration; for example, the LockDuration Property. </summary>
/// <value>The maximum duration during which locks are automatically renewed.</value>
public TimeSpan? AutoRenewTimeout { get; set; }

/// <summary>
/// Processor for messages.
/// </summary>
protected IServiceBusMessageReceiver Receiver { get; }

/// <summary>
/// Creates a new instance, using the init parameters of a <see cref="StatefulService"/>
/// </summary>
/// <param name="receiver">(Required) Processes incoming messages.</param>
/// <param name="context">(Optional) The context that was used to init the Reliable Service that uses this listener.</param>
/// <param name="serviceBusQueueName">(Optional) The name of the monitored Service Bus Queue (EntityPath in connectionstring is supported too)</param>
/// <param name="serviceBusSendConnectionString">(Optional) A Service Bus connection string that can be used for Sending messages.
/// (Returned as Service Endpoint.).
/// </param>
/// <param name="serviceBusReceiveConnectionString">(Required) A Service Bus connection string that can be used for Receiving messages.
/// </param>
public ServiceBusQueueCommunicationListener(IServiceBusMessageReceiver receiver,
ServiceContext context,
string serviceBusQueueName,
string serviceBusSendConnectionString,
string serviceBusReceiveConnectionString)
: base(context, serviceBusQueueName, serviceBusSendConnectionString, serviceBusReceiveConnectionString)
{
Receiver = receiver ?? throw new ArgumentNullException(nameof(receiver));
}

/// <summary>
/// Creates a new instance, using the init parameters of a <see cref="StatefulService"/>
/// </summary>
/// <param name="receiverFactory">(Required) Creates a handler that processes incoming messages.</param>
/// <param name="context">(Optional) The context that was used to init the Reliable Service that uses this listener.</param>
/// <param name="serviceBusQueueName">(Optional) The name of the monitored Service Bus Queue (EntityPath in connectionstring is supported too)</param>
/// <param name="serviceBusSendConnectionString">(Optional) A Service Bus connection string that can be used for Sending messages.
/// (Returned as Service Endpoint.).
/// </param>
/// <param name="serviceBusReceiveConnectionString">(Required) A Service Bus connection string that can be used for Receiving messages.
/// </param>
public ServiceBusQueueCommunicationListener(Func<IServiceBusCommunicationListener, IServiceBusMessageReceiver> receiverFactory,
ServiceContext context,
string serviceBusQueueName,
string serviceBusSendConnectionString,
string serviceBusReceiveConnectionString)
: base(context, serviceBusQueueName, serviceBusSendConnectionString, serviceBusReceiveConnectionString)
{
if (receiverFactory == null) throw new ArgumentNullException(nameof(receiverFactory));
var serviceBusMessageReceiver = receiverFactory(this);
Receiver = serviceBusMessageReceiver ?? throw new ArgumentException("Receiver factory cannot return null.", nameof(receiverFactory));
}

/// <summary>
/// Starts listening for messages on the configured Service Bus Queue.
/// </summary>
protected override void ListenForMessages()
{
var options = new MessageHandlerOptions(ExceptionReceivedHandler);
if (AutoRenewTimeout.HasValue)
{
options.MaxAutoRenewDuration = AutoRenewTimeout.Value;
}
if (MaxConcurrentCalls.HasValue)
{
options.MaxConcurrentCalls = MaxConcurrentCalls.Value;
}

ServiceBusClient.RegisterMessageHandler(ReceiveMessageAsync, options);
}

/// <summary>
/// Logs the error and continues.
/// </summary>
/// <param name="args"></param>
/// <returns></returns>
protected virtual Task ExceptionReceivedHandler(ExceptionReceivedEventArgs args)
{
LogAction($"There was an error while receiving a message: {args.Exception.Message}");
return Task.CompletedTask;
}

/// <summary>
/// Will pass an incoming message to the <see cref="Receiver"/> for processing.
/// </summary>
/// <param name="message"></param>
/// <param name="cancellationToken"></param>
protected async Task ReceiveMessageAsync(Message message, CancellationToken cancellationToken)
{
try
{
if (IsClosing)
{
// We want the thread to sleep and not return immediately.
// Returning immediately could increment the message fail count and send it to dead letter.
Thread.Sleep(CloseTimeout);
return;
}

ProcessingMessage.Wait(cancellationToken);
var combined = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, StopProcessingMessageToken).Token;
await Receiver.ReceiveMessageAsync(message, combined);
if (Receiver.AutoComplete)
{
await ServiceBusClient.CompleteAsync(message.SystemProperties.LockToken);
}
}
finally
{
ProcessingMessage.Release();
}
}

protected override void Dispose(bool disposing)
{
if (!disposing) return;

if (MaxConcurrentCalls.HasValue)
{
ProcessingMessage.Release(MaxConcurrentCalls.Value);
}
else
{
ProcessingMessage.Release();
}

ProcessingMessage.Dispose();

base.Dispose(disposing);
}
}
ProcessingMessage.Dispose();

base.Dispose(disposing);
}
}
}
Loading

0 comments on commit 72243bb

Please sign in to comment.