Skip to content

Commit

Permalink
first netstd steps
Browse files Browse the repository at this point in the history
  • Loading branch information
LoekD authored and LoekD committed Aug 23, 2018
1 parent d958651 commit 4ee955c
Showing 12 changed files with 8,620 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;

namespace ServiceFabric.ServiceBus.Services.Netstd
{
/// <summary>
/// Marks a class as capable of receiving <see cref="Message"/>s, with added cancellation support.
/// </summary>
public interface IServiceBusMessageReceiver
{
/// <summary>
/// Indicates whether a batch of messages should be automatically completed after processing.
/// </summary>
bool AutoComplete { get; }

/// <summary>
/// Processes a message. Must perform error handling and also message completion or abandoning.
/// </summary>
/// <param name="message">The incoming Service Bus Message to process</param>
/// <param name="cancellationToken">When Set, indicates that processing should stop.</param>
Task ReceiveMessageAsync(Message message, CancellationToken cancellationToken);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
using System;
using System.Fabric;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;
using Microsoft.ServiceFabric.Services.Communication.Runtime;

namespace ServiceFabric.ServiceBus.Services.Netstd
{
/// <summary>
/// Abstract base implementation for <see cref="ICommunicationListener"/> connected to ServiceBus
/// </summary>
public abstract class ServiceBusCommunicationListener : ICommunicationListener, IDisposable
{
private readonly CancellationTokenSource _stopProcessingMessageTokenSource;

//prevents aborts during the processing of a message
protected ManualResetEvent ProcessingMessage { get; } = new ManualResetEvent(true);

/// <summary>
/// Gets the <see cref="ServiceContext"/> that was used to create this instance. Can be null.
/// </summary>
protected ServiceContext Context { get; }

/// <summary>
/// Gets a Service Bus connection string that should have only receive-rights.
/// </summary>
protected string ReceiveConnectionString { get; }

/// <summary>
/// Gets a Service Bus connection string that should have only send-rights.
/// </summary>
protected string SendConnectionString { get; }

/// <summary>
/// When <see cref="CancellationToken.IsCancellationRequested"/> is true, this indicates that either <see cref="CloseAsync"/>
/// or <see cref="Abort"/> was called.
/// </summary>
protected CancellationToken StopProcessingMessageToken { get; }

/// <summary>
/// Gets or sets the prefetch size when receiving Service Bus Messages. (Defaults to 0, which indicates no prefetch)
/// Set to 20 times the total number of messages that a single receiver can process per second.
/// </summary>
public int MessagePrefetchCount { get; set; }

/// <summary>
/// Gets or sets the timeout for receiving a batch of Service Bus Messages. (Defaults to 30s)
/// </summary>
public TimeSpan ServerTimeout { get; set; } = TimeSpan.FromSeconds(30);

/// <summary>
/// Gets or sets the Service Bus client ReceiveMode.
/// </summary>
public ReceiveMode ReceiveMode { get; set; } = ReceiveMode.PeekLock;

/// <summary>
/// Gets or sets a callback for writing logs. (Defaults to null)
/// </summary>
public Action<string> LogAction { get; set; }

/// <summary>
/// Retry policy for client.
/// </summary>
public RetryPolicy RetryPolicy { get; set; }


/// <summary>
/// Creates a new instance.
/// </summary>
/// <param name="context">(Optional) The context that was used to init the Reliable Service that uses this listener.</param>
/// <param name="serviceBusSendConnectionString">(Optional) A Service Bus connection string that can be used for Sending messages.
/// (Returned as Service Endpoint.).
/// </param>
/// <param name="serviceBusReceiveConnectionString">(Required) A Service Bus connection string that can be used for Receiving messages.
/// </param>
protected ServiceBusCommunicationListener(ServiceContext context
, string serviceBusSendConnectionString
, string serviceBusReceiveConnectionString
)
{
if (string.IsNullOrWhiteSpace(serviceBusSendConnectionString)) serviceBusSendConnectionString = "not:/available";
if (string.IsNullOrWhiteSpace(serviceBusReceiveConnectionString)) throw new ArgumentOutOfRangeException(nameof(serviceBusReceiveConnectionString));

Context = context;
SendConnectionString = serviceBusSendConnectionString;
ReceiveConnectionString = serviceBusReceiveConnectionString;

_stopProcessingMessageTokenSource = new CancellationTokenSource();
StopProcessingMessageToken = _stopProcessingMessageTokenSource.Token;
}

/// <summary>
/// This method causes the communication listener to be opened. Once the Open
/// completes, the communication listener becomes usable - accepts and sends messages.
/// </summary>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>
/// A <see cref="T:System.Threading.Tasks.Task">Task</see> that represents outstanding operation. The result of the Task is
/// the endpoint string.
/// </returns>
public abstract Task<string> OpenAsync(CancellationToken cancellationToken);

/// <summary>
/// This method causes the communication listener to close. Close is a terminal state and
/// this method allows the communication listener to transition to this state in a
/// graceful manner.
/// </summary>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>
/// A <see cref="T:System.Threading.Tasks.Task">Task</see> that represents outstanding operation.
/// </returns>
public Task CloseAsync(CancellationToken cancellationToken)
{
WriteLog("Service Bus Communication Listnener closing");
_stopProcessingMessageTokenSource.Cancel();
//Wait for Message processing to complete..
ProcessingMessage.WaitOne();
ProcessingMessage.Dispose();
return CloseImplAsync(cancellationToken);
}

/// <summary>
/// This method causes the communication listener to close. Close is a terminal state and
/// this method causes the transition to close ungracefully. Any outstanding operations
/// (including close) should be canceled when this method is called.
/// </summary>
public virtual void Abort()
{
WriteLog("Service Bus Communication Listnener aborting");
Dispose();
}

/// <summary>
/// This method causes the communication listener to close. Close is a terminal state and
/// this method allows the communication listener to transition to this state in a
/// graceful manner.
/// </summary>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>
/// A <see cref="T:System.Threading.Tasks.Task">Task</see> that represents outstanding operation.
/// </returns>
protected virtual Task CloseImplAsync(CancellationToken cancellationToken)
{
return Task.FromResult(true);
}


/// <summary>
/// Writes a log entry if <see cref="LogAction"/> is not null.
/// </summary>
/// <param name="message"></param>
/// <param name="callerMemberName"></param>
protected void WriteLog(string message, [CallerMemberName]string callerMemberName = "unknown")
{
LogAction?.Invoke($"{GetType().FullName} \t {callerMemberName} \t {message}");
}

/// <summary>
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
/// </summary>
public void Dispose()
{
GC.SuppressFinalize(this);
Dispose(true);
}

/// <summary>
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
/// </summary>
protected virtual void Dispose(bool disposing)
{
if (!disposing) return;
ProcessingMessage.Set();
ProcessingMessage.Dispose();
_stopProcessingMessageTokenSource.Cancel();
_stopProcessingMessageTokenSource.Dispose();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
using System;
using System.Fabric;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;
using Microsoft.ServiceFabric.Services.Communication.Runtime;
using Microsoft.ServiceFabric.Services.Runtime;

namespace ServiceFabric.ServiceBus.Services.Netstd
{
/// <summary>
/// Implementation of <see cref="ICommunicationListener"/> that listens to a Service Bus Queue.
/// </summary>
public class ServiceBusQueueCommunicationListener : ServiceBusQueueCommunicationListenerBase
{
/// <summary>Gets or sets the maximum duration within which the lock will be renewed automatically. This
/// value should be greater than the longest message lock duration; for example, the LockDuration Property. </summary>
/// <value>The maximum duration during which locks are automatically renewed.</value>
public TimeSpan? AutoRenewTimeout { get; set; }

/// <summary>
/// (Ignored when using Sessions) Gets or sets the MaxConcurrentCalls that will be passed to the <see cref="Receiver"/>. Can be null.
/// </summary>
public int? MaxConcurrentCalls { get; set; }

/// <summary>
/// Processor for messages.
/// </summary>
protected IServiceBusMessageReceiver Receiver { get; }

/// <summary>
/// Creates a new instance, using the init parameters of a <see cref="StatefulService"/>
/// </summary>
/// <param name="receiver">(Required) Processes incoming messages.</param>
/// <param name="context">(Optional) The context that was used to init the Reliable Service that uses this listener.</param>
/// <param name="serviceBusQueueName">(Optional) The name of the monitored Service Bus Queue (EntityPath in connectionstring is supported too)</param>
/// <param name="serviceBusSendConnectionString">(Optional) A Service Bus connection string that can be used for Sending messages.
/// (Returned as Service Endpoint.).
/// </param>
/// <param name="serviceBusReceiveConnectionString">(Required) A Service Bus connection string that can be used for Receiving messages.
/// </param>
public ServiceBusQueueCommunicationListener(IServiceBusMessageReceiver receiver,
ServiceContext context,
string serviceBusQueueName,
string serviceBusSendConnectionString,
string serviceBusReceiveConnectionString)
: base(context, serviceBusQueueName, serviceBusSendConnectionString, serviceBusReceiveConnectionString)
{
Receiver = receiver;
}

/// <summary>
/// Starts listening for messages on the configured Service Bus Queue.
/// </summary>
protected override void ListenForMessages()
{
var options = new MessageHandlerOptions(ExceptionReceivedHandler);
if (AutoRenewTimeout.HasValue)
{
options.MaxAutoRenewDuration = AutoRenewTimeout.Value;
}
if (MaxConcurrentCalls.HasValue)
{
options.MaxConcurrentCalls = MaxConcurrentCalls.Value;
}
ServiceBusClient.RegisterMessageHandler(ReceiveMessageAsync, options);
}

/// <summary>
/// Logs the error and continues.
/// </summary>
/// <param name="args"></param>
/// <returns></returns>
protected virtual Task ExceptionReceivedHandler(ExceptionReceivedEventArgs args)
{
LogAction($"There was an error while receiving a message: {args.Exception.Message}");
return Task.CompletedTask;
}

/// <summary>
/// Will pass an incoming message to the <see cref="Receiver"/> for processing.
/// </summary>
/// <param name="message"></param>
/// <param name="cancellationToken"></param>
protected async Task ReceiveMessageAsync(Message message, CancellationToken cancellationToken)
{
try
{
ProcessingMessage.Reset();
var combined = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, StopProcessingMessageToken).Token;
await Receiver.ReceiveMessageAsync(message, combined);
if (Receiver.AutoComplete)
{
await ServiceBusClient.CompleteAsync(message.SystemProperties.LockToken);
}
}
finally
{
ProcessingMessage.Set();
}
}
}
}
Loading

0 comments on commit 4ee955c

Please sign in to comment.