Skip to content

Commit

Permalink
Merge branch 'bugfix/max-concurrent-callls'
Browse files Browse the repository at this point in the history
  • Loading branch information
Tankatronic committed Apr 7, 2019
2 parents 73b6d5f + db42d12 commit d63bf44
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 9 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ Please also make sure all feature additions have a corresponding unit test.

## Release notes:

v6.0.4 (Netstd only)
- fix bug that would cause the ServiceBusCommunicationListener to close before all messages are processed, when MaxConcurrentCalls is greater than 1.

v6.0.3 (Netstd only)
- add IServiceBusCommunicationListener to DefaultServiceBusMessageReceiver

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,12 @@ public abstract class ServiceBusCommunicationListener : IServiceBusCommunication
{
private readonly CancellationTokenSource _stopProcessingMessageTokenSource;

protected int ConcurrencyCount = 1;

protected bool IsClosing;

//prevents aborts during the processing of a message
protected ManualResetEvent ProcessingMessage { get; } = new ManualResetEvent(true);
protected SemaphoreSlim ProcessingMessage { get; set; }

/// <summary>
/// Gets the <see cref="ServiceContext"/> that was used to create this instance. Can be null.
Expand All @@ -69,6 +73,12 @@ public abstract class ServiceBusCommunicationListener : IServiceBusCommunication
/// </summary>
protected CancellationToken StopProcessingMessageToken { get; }

/// <summary>
/// Gets or sets the amount of time to wait for remaining messages to process when <see cref="CloseAsync(CancellationToken)"/> is invoked.
/// Defaults to 1 minute.
/// </summary>
public TimeSpan CloseTimeout { get; set; } = TimeSpan.FromMinutes(1);

/// <summary>
/// Gets or sets the prefetch size when receiving Service Bus Messages. (Defaults to 0, which indicates no prefetch)
/// Set to 20 times the total number of messages that a single receiver can process per second.
Expand Down Expand Up @@ -144,9 +154,23 @@ protected ServiceBusCommunicationListener(ServiceContext context
public Task CloseAsync(CancellationToken cancellationToken)
{
WriteLog("Service Bus Communication Listnener closing");
IsClosing = true;
_stopProcessingMessageTokenSource.Cancel();

//Wait for Message processing to complete..
ProcessingMessage.WaitOne();
Task.WaitAny(
// Timeout task.
Task.Run(() => Thread.Sleep(CloseTimeout)),
// Wait for all processing messages to finish.
Task.Run(() =>
{
while(ConcurrencyCount > 0)
{
ProcessingMessage.Wait();
ConcurrencyCount--;
}
}));

ProcessingMessage.Dispose();
return CloseImplAsync(cancellationToken);
}
Expand Down Expand Up @@ -202,8 +226,7 @@ public void Dispose()
protected virtual void Dispose(bool disposing)
{
if (!disposing) return;
ProcessingMessage.Set();
ProcessingMessage.Dispose();

_stopProcessingMessageTokenSource.Cancel();
_stopProcessingMessageTokenSource.Dispose();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ protected override void ListenForMessages()
if (MaxConcurrentCalls.HasValue)
{
options.MaxConcurrentCalls = MaxConcurrentCalls.Value;
ProcessingMessage = new SemaphoreSlim(options.MaxConcurrentCalls, options.MaxConcurrentCalls);
ConcurrencyCount = options.MaxConcurrentCalls;
}
else
{
ProcessingMessage = new SemaphoreSlim(1, 1);
}
ServiceBusClient.RegisterMessageHandler(ReceiveMessageAsync, options);
}
Expand All @@ -109,7 +115,15 @@ protected async Task ReceiveMessageAsync(Message message, CancellationToken canc
{
try
{
ProcessingMessage.Reset();
if (IsClosing)
{
// We want the thread to sleep and not return immediately.
// Returning immediately could increment the message fail count and send it to dead letter.
Thread.Sleep(CloseTimeout);
return;
}

ProcessingMessage.Wait();
var combined = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, StopProcessingMessageToken).Token;
await Receiver.ReceiveMessageAsync(message, combined);
if (Receiver.AutoComplete)
Expand All @@ -119,8 +133,26 @@ protected async Task ReceiveMessageAsync(Message message, CancellationToken canc
}
finally
{
ProcessingMessage.Set();
ProcessingMessage.Release();
}
}

protected override void Dispose(bool disposing)
{
if (!disposing) return;

if (MaxConcurrentCalls.HasValue)
{
ProcessingMessage.Release(MaxConcurrentCalls.Value);
}
else
{
ProcessingMessage.Release();
}

ProcessingMessage.Dispose();

base.Dispose(disposing);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ protected override void ListenForMessages()
if (MaxConcurrentCalls.HasValue)
{
options.MaxConcurrentCalls = MaxConcurrentCalls.Value;
ProcessingMessage = new SemaphoreSlim(options.MaxConcurrentCalls, options.MaxConcurrentCalls);
ConcurrencyCount = options.MaxConcurrentCalls;
}
else
{
ProcessingMessage = new SemaphoreSlim(1, 1);
}
ServiceBusClient.RegisterMessageHandler(ReceiveMessageAsync, options);
}
Expand All @@ -114,7 +120,7 @@ protected async Task ReceiveMessageAsync(Message message, CancellationToken canc
{
try
{
ProcessingMessage.Reset();
ProcessingMessage.Wait();
var combined = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, StopProcessingMessageToken).Token;
await Receiver.ReceiveMessageAsync(message, combined);
if (Receiver.AutoComplete)
Expand All @@ -124,7 +130,7 @@ protected async Task ReceiveMessageAsync(Message message, CancellationToken canc
}
finally
{
ProcessingMessage.Set();
ProcessingMessage.Release();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<PropertyGroup>
<Description>Receive BrokeredMessages in ServiceFabric Services through Service Bus. Add this package to your Reliable Service projects. The ServiceFabric.ServiceBus.Services Class Library can be used in conjunction with ServiceFabric.ServiceBus.Clients (optional)</Description>
<Copyright>2019</Copyright>
<VersionPrefix>6.0.3</VersionPrefix>
<VersionPrefix>6.0.4</VersionPrefix>
<Authors>Loek Duys</Authors>
<TargetFramework>netstandard2.0</TargetFramework>
<PlatformTarget>x64</PlatformTarget>
Expand Down

0 comments on commit d63bf44

Please sign in to comment.