Skip to content

Commit

Permalink
fix bug that completes messages twice when auto complete is true
Browse files Browse the repository at this point in the history
  • Loading branch information
Loek committed May 17, 2019
1 parent 119cbbe commit 2dec5d9
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 87 deletions.
159 changes: 81 additions & 78 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ Please also make sure all feature additions have a corresponding unit test.

## Release notes:

v6.0.4 (Netstd only)
v7.0.1 (Netstd only)
- fix bug that completes messages twice when auto complete is true

v7.0.0 (Netstd only)
- fix bug that would cause the ServiceBusCommunicationListener to close before all messages are processed, when MaxConcurrentCalls is greater than 1.

v6.0.3 (Netstd only)
Expand Down Expand Up @@ -134,14 +137,14 @@ v3.5.0
[..]
Action<string> logAction = log => ServiceEventSource.Current.ServiceMessage(this, log);
new ServiceInstanceListener(context => new ServiceBusQueueCommunicationListener(
new Handler(logAction)
, context
, serviceBusQueueName
new Handler(logAction)
, context
, serviceBusQueueName
, requireSessions: false)
{
MessageLockRenewTimeSpan = TimeSpan.FromSeconds(50), //auto renew every 50s, so processing can take longer than 60s (default lock duration).
{
MessageLockRenewTimeSpan = TimeSpan.FromSeconds(50), //auto renew every 50s, so processing can take longer than 60s (default lock duration).
LogAction = logAction
}, "StatelessService-ServiceBusQueueListener");
}, "StatelessService-ServiceBusQueueListener");
[..]
```

Expand Down Expand Up @@ -187,14 +190,14 @@ Make sure your projects are configured to build as 64 bit programs!
```javascript
internal sealed class Handler : IServiceBusMessageReceiver
{
private readonly StatefulService _service;

public Handler(StatefulService service)
{
_service = service;
}
public Task ReceiveMessageAsync(BrokeredMessage message, MessageSession session, CancellationToken cancellationToken)
private readonly StatefulService _service;

public Handler(StatefulService service)
{
_service = service;
}
public Task ReceiveMessageAsync(BrokeredMessage message, MessageSession session, CancellationToken cancellationToken)
{
ServiceEventSource.Current.ServiceMessage(_service, $"Handling subscription message {message.MessageId}");
return Task.FromResult(true);
Expand All @@ -206,86 +209,86 @@ internal sealed class Handler : IServiceBusMessageReceiver
```javascript
internal sealed class SampleQueueListeningStatefulService : StatefulService
{
protected override IEnumerable<ServiceReplicaListener> CreateServiceReplicaListeners()
{
// In the configuration file, define connection strings:
// "Microsoft.ServiceBus.ConnectionString.Receive"
// and "Microsoft.ServiceBus.ConnectionString.Send"
// Also, define a QueueName:
string serviceBusQueueName = CloudConfigurationManager.GetSetting("QueueName");

yield return new ServiceReplicaListener(context => new ServiceBusQueueCommunicationListener(
new Handler(this)
, context
, serviceBusQueueName), "StatefulService-ServiceBusQueueListener");
}
protected override IEnumerable<ServiceReplicaListener> CreateServiceReplicaListeners()
{
// In the configuration file, define connection strings:
// "Microsoft.ServiceBus.ConnectionString.Receive"
// and "Microsoft.ServiceBus.ConnectionString.Send"
// Also, define a QueueName:
string serviceBusQueueName = CloudConfigurationManager.GetSetting("QueueName");

yield return new ServiceReplicaListener(context => new ServiceBusQueueCommunicationListener(
new Handler(this)
, context
, serviceBusQueueName), "StatefulService-ServiceBusQueueListener");
}
}
```
------------------------------------
## To create a Stateful Service that can be accessed through a Service Bus Subscription:
```javascript
internal sealed class SampleSubscriptionListeningStatefulService : StatefulService
{
protected override IEnumerable<ServiceReplicaListener> CreateServiceReplicaListeners()
{
// In the configuration file, define connection strings:
// "Microsoft.ServiceBus.ConnectionString.Receive"
// and "Microsoft.ServiceBus.ConnectionString.Send"
// Also, define Topic & Subscription Names:
string serviceBusTopicName = CloudConfigurationManager.GetSetting("TopicName");
string serviceBusSubscriptionName = CloudConfigurationManager.GetSetting("SubscriptionName");

yield return new ServiceReplicaListener(context => new ServiceBusSubscriptionCommunicationListener(
new Handler(this)
, context
, serviceBusTopicName
, serviceBusSubscriptionName), "StatefulService-ServiceBusSubscriptionListener");
}
protected override IEnumerable<ServiceReplicaListener> CreateServiceReplicaListeners()
{
// In the configuration file, define connection strings:
// "Microsoft.ServiceBus.ConnectionString.Receive"
// and "Microsoft.ServiceBus.ConnectionString.Send"
// Also, define Topic & Subscription Names:
string serviceBusTopicName = CloudConfigurationManager.GetSetting("TopicName");
string serviceBusSubscriptionName = CloudConfigurationManager.GetSetting("SubscriptionName");

yield return new ServiceReplicaListener(context => new ServiceBusSubscriptionCommunicationListener(
new Handler(this)
, context
, serviceBusTopicName
, serviceBusSubscriptionName), "StatefulService-ServiceBusSubscriptionListener");
}
}
```
------------------------------------
## To create a Stateless Service that can be accessed through a Service Bus Queue:
```javascript
internal sealed class SampleQueueListeningStatelessService : StatelessService
{
protected override IEnumerable<ServiceInstanceListener> CreateServiceInstanceListeners()
{
// In the configuration file, define connection strings:
// "Microsoft.ServiceBus.ConnectionString.Receive"
// and "Microsoft.ServiceBus.ConnectionString.Send"
// Also, define a QueueName:
string serviceBusQueueName = CloudConfigurationManager.GetSetting("QueueName");
yield return new ServiceInstanceListener(context => new ServiceBusQueueCommunicationListener(
new Handler(this)
, context
, serviceBusQueueName), "StatelessService-ServiceBusQueueListener");
}
protected override IEnumerable<ServiceInstanceListener> CreateServiceInstanceListeners()
{
// In the configuration file, define connection strings:
// "Microsoft.ServiceBus.ConnectionString.Receive"
// and "Microsoft.ServiceBus.ConnectionString.Send"
// Also, define a QueueName:
string serviceBusQueueName = CloudConfigurationManager.GetSetting("QueueName");
yield return new ServiceInstanceListener(context => new ServiceBusQueueCommunicationListener(
new Handler(this)
, context
, serviceBusQueueName), "StatelessService-ServiceBusQueueListener");
}
}
```
------------------------------------
## To create a Stateless Service that can be accessed through a Service Bus Subscription:
```javascript
internal sealed class SampleSubscriptionListeningStatelessService : StatelessService
{
protected override IEnumerable<ServiceInstanceListener> CreateServiceInstanceListeners()
{
// In the configuration file, define connection strings:
// "Microsoft.ServiceBus.ConnectionString.Receive"
// and "Microsoft.ServiceBus.ConnectionString.Send"
// Also, define Topic & Subscription Names:
string serviceBusTopicName = CloudConfigurationManager.GetSetting("TopicName");
string serviceBusSubscriptionName = CloudConfigurationManager.GetSetting("SubscriptionName");

yield return new ServiceInstanceListener(context => new ServiceBusSubscriptionCommunicationListener(
new Handler(this)
, context
, serviceBusTopicName
, serviceBusSubscriptionName), "StatelessService-ServiceBusSubscriptionListener");
}
protected override IEnumerable<ServiceInstanceListener> CreateServiceInstanceListeners()
{
// In the configuration file, define connection strings:
// "Microsoft.ServiceBus.ConnectionString.Receive"
// and "Microsoft.ServiceBus.ConnectionString.Send"
// Also, define Topic & Subscription Names:
string serviceBusTopicName = CloudConfigurationManager.GetSetting("TopicName");
string serviceBusSubscriptionName = CloudConfigurationManager.GetSetting("SubscriptionName");

yield return new ServiceInstanceListener(context => new ServiceBusSubscriptionCommunicationListener(
new Handler(this)
, context
, serviceBusTopicName
, serviceBusSubscriptionName), "StatelessService-ServiceBusSubscriptionListener");
}
}
```

Expand Down Expand Up @@ -314,9 +317,9 @@ var servicePartitionClient = new ServicePartitionClient<ServiceBusTopicCommunica
//use the proxy to send a message to the Service
servicePartitionClient.InvokeWithRetry(c => c.SendMessage(new BrokeredMessage()
{
Properties =
{
{ "TestKey", "TestValue" }
}
Properties =
{
{ "TestKey", "TestValue" }
}
}));
```
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,6 @@ protected async Task ReceiveMessageAsync(Message message, CancellationToken canc
ProcessingMessage.Wait(cancellationToken);
var combined = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, StopProcessingMessageToken).Token;
await Receiver.ReceiveMessageAsync(message, combined);
if (Receiver.AutoComplete)
{
await ServiceBusClient.CompleteAsync(message.SystemProperties.LockToken);
}
}
finally
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,6 @@ protected async Task ReceiveMessageAsync(Message message, CancellationToken canc
ProcessingMessage.Wait(cancellationToken);
var combined = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, StopProcessingMessageToken).Token;
await Receiver.ReceiveMessageAsync(message, combined);
if (Receiver.AutoComplete)
{
await ServiceBusClient.CompleteAsync(message.SystemProperties.LockToken);
}
}
finally
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<PropertyGroup>
<Description>Receive BrokeredMessages in ServiceFabric Services through Service Bus. Add this package to your Reliable Service projects. The ServiceFabric.ServiceBus.Services Class Library can be used in conjunction with ServiceFabric.ServiceBus.Clients (optional)</Description>
<Copyright>2019</Copyright>
<VersionPrefix>7.0.0</VersionPrefix>
<VersionPrefix>7.0.1</VersionPrefix>
<Authors>Loek Duys</Authors>
<TargetFramework>netstandard2.0</TargetFramework>
<PlatformTarget>x64</PlatformTarget>
Expand Down

0 comments on commit 2dec5d9

Please sign in to comment.