Skip to content

Commit

Permalink
Develop (#455)
Browse files Browse the repository at this point in the history
* Added snapshot support to sagas via a SnapshotSagaRepository
Switched to using threadsafe collections in a threaded operation

* Updated logging packages.
Corrected an incorrect exception type being raised for connection strings.
Corrected a file and class name TableStoragEventStoreModule => TableStorageEventStoreModule.
Added .NET Core dependency modules for blob and table storage data stores and event stores.

* Updated ConcurrencyException to have ExpectedVersion and FoundVersion.
Updated AzureServiceBus generation of SourceMethod
  • Loading branch information
cdmdotnet authored Oct 21, 2024
1 parent 702c32f commit bdbf8cc
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 20 deletions.
32 changes: 28 additions & 4 deletions Framework/Azure/Cqrs.Azure.ServiceBus/AzureServiceBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
using System.Reflection;
using System.Security.Cryptography;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using Chinchilla.Logging;
Expand All @@ -38,7 +39,6 @@
using Manager = Microsoft.ServiceBus.NamespaceManager;
using IMessageReceiver = Microsoft.ServiceBus.Messaging.SubscriptionClient;
#endif

#if NET462
using Microsoft.Identity.Client;
#endif
Expand Down Expand Up @@ -234,6 +234,10 @@ public abstract class AzureServiceBus<TAuthenticationToken>
/// </summary>
protected IList<string> ExclusionNamespaces { get; private set; }

private IList<string> TaskRelatedMethodNames { get; }

private Regex ContainerNameMatcher { get; }

#if NET462
/// <summary>
/// Gets an access token from Active Directory when using RBAC based connections.
Expand All @@ -249,16 +253,25 @@ protected AzureServiceBus(IConfigurationManager configurationManager, IMessageSe
{
AzureBusHelper = azureBusHelper;
BusHelper = busHelper;
Signer = hashAlgorithmFactory;

TelemetryHelper = new NullTelemetryHelper();
PrivateServiceBusReceivers = new Dictionary<int, IMessageReceiver>();
PublicServiceBusReceivers = new Dictionary<int, IMessageReceiver>();
TimeoutOnSendRetryMaximumCount = 1;

string timeoutOnSendRetryMaximumCountValue;
short timeoutOnSendRetryMaximumCount;
if (ConfigurationManager.TryGetSetting("Cqrs.Azure.Servicebus.TimeoutOnSendRetryMaximumCount", out timeoutOnSendRetryMaximumCountValue) && !string.IsNullOrWhiteSpace(timeoutOnSendRetryMaximumCountValue) && short.TryParse(timeoutOnSendRetryMaximumCountValue, out timeoutOnSendRetryMaximumCount))
TimeoutOnSendRetryMaximumCount = timeoutOnSendRetryMaximumCount;

ExclusionNamespaces = new SynchronizedCollection<string> { "Cqrs", "System" };
Signer = hashAlgorithmFactory;
TaskRelatedMethodNames = new List<string>
{
"MoveNext",
"Start"
};
ContainerNameMatcher = new Regex("^(.)+?>", RegexOptions.IgnoreCase);

#if NET462
InstantiateActiveDirectoryToken();
Expand Down Expand Up @@ -1410,14 +1423,25 @@ BrokeredMessage CreateBrokeredMessage
foreach (StackFrame frame in stackFrames)
{
MethodBase method = frame.GetMethod();
if (method.ReflectedType == null)
if (method.ReflectedType == null || string.IsNullOrWhiteSpace(method.ReflectedType.FullName))
continue;

try
{
if (ExclusionNamespaces.All(@namespace => !method.ReflectedType.FullName.StartsWith(@namespace)))
{
brokeredMessage.AddUserProperty("Source-Method", $"{method.ReflectedType.FullName}.{method.Name}");
string container;
Match match;
if (method.DeclaringType.IsSealed && method.DeclaringType.IsNestedPrivate && method.DeclaringType.IsAutoLayout && TaskRelatedMethodNames.Contains(method.Name) && (match = ContainerNameMatcher.Match(method.ReflectedType.FullName)).Success)
{
container = match.Value.Substring(0, match.Value.Length - 1).Replace("+<", "\\");
}
else
{
container = $"{method.ReflectedType.FullName}\\{method.Name}";
}

brokeredMessage.AddUserProperty("Source-Method", container);
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,6 @@
using TestCleanup = NUnit.Framework.TearDownAttribute;
using TestContext = System.Object;





#if NET472
#else
using System.Threading.Tasks;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public void Apply(TestEvent e)
EventCount++;
}

#endregion

protected override void SetId(ISagaEvent<Guid> sagaEvent)
{
// We set Id as the eventstore is using that and not an IEventWithIdentity
Expand All @@ -50,8 +52,6 @@ protected override void RestoreFromSnapshot(TestSagaSnapshot snapshot)
{
EventCount = snapshot.EventCount;
}

#endregion
}

public class TestSagaSnapshot : Snapshot
Expand Down
2 changes: 1 addition & 1 deletion Framework/Cqrs/Domain/AggregateRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ async Task SaveAsync
eventStoreResults = await EventStore.GetAsync(aggregate.GetType(), aggregate.Id, false, expectedVersion.Value);
#endif
if (eventStoreResults.Any())
throw new ConcurrencyException(aggregate.Id);
throw new ConcurrencyException(aggregate.Id, expectedVersion.Value, eventStoreResults.First().Version);
}

var eventsToPublish = new List<IEvent<TAuthenticationToken>>();
Expand Down
30 changes: 28 additions & 2 deletions Framework/Cqrs/Domain/Exceptions/ConcurrencyException.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,42 @@ public class ConcurrencyException : Exception
/// Instantiate a new instance of <see cref="ConcurrencyException"/> with the provided identifier of the <see cref="IAggregateRoot{TAuthenticationToken}"/> that had a concurrency issue.
/// </summary>
/// <param name="id">The identifier of the <see cref="IAggregateRoot{TAuthenticationToken}"/> that wasn't found.</param>
public ConcurrencyException(Guid id)
: base(string.Format("A different version than expected was found in aggregate {0}", id))
/// <param name="expectedVersion">The version that was expected.</param>
/// <param name="foundVersion">The version that was found.</param>
public ConcurrencyException(Guid id, int? expectedVersion = null, int? foundVersion = null)
: base(GenerateMessage(id, expectedVersion, foundVersion))
{
Id = id;
ExpectedVersion = expectedVersion;
FoundVersion = foundVersion;
}

static string GenerateMessage(Guid id, int? expectedVersion = null, int? foundVersion = null)
{
string pattern = $"A different version than expected was found in aggregate {id}";
if (expectedVersion != null)
pattern = string.Concat(pattern, $". Expected Version {expectedVersion}");
if (foundVersion != null)
pattern = string.Concat(pattern, $". Found Version {foundVersion}");
return pattern;
}

/// <summary>
/// The identifier of the <see cref="IAggregateRoot{TAuthenticationToken}"/> that had a concurrency issue.
/// </summary>
[DataMember]
public Guid Id { get; set; }

/// <summary>
/// The version that was expected.
/// </summary>
[DataMember]
public int? ExpectedVersion { get; set; }

/// <summary>
/// The version that was found.
/// </summary>
[DataMember]
public int? FoundVersion { get; set; }
}
}
2 changes: 1 addition & 1 deletion Framework/Cqrs/Domain/SagaRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ await EventStore.GetAsync
#endif
(saga.GetType(), saga.Id, false, expectedVersion.Value);
if (eventStoreResults.Any())
throw new ConcurrencyException(saga.Id);
throw new ConcurrencyException(saga.Id, expectedVersion.Value, eventStoreResults.First().Version);
}

var eventsToPublish = new List<ISagaEvent<TAuthenticationToken>>();
Expand Down
6 changes: 3 additions & 3 deletions Framework/Cqrs/Domain/SagaUnitOfWork.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ async Task AddAsync
};
TrackedSagas.Add(saga.Id, sagaDescriptor);
}
else if (((TrackedSagas[saga.Id]).Saga) != (ISaga<TAuthenticationToken>)saga)
else if (TrackedSagas[saga.Id].Saga != (ISaga<TAuthenticationToken>)saga)
throw new ConcurrencyException(saga.Id);
#if NET40
#else
Expand All @@ -102,7 +102,7 @@ async Task<TSaga> GetAsync
{
var trackedSaga = (TSaga)TrackedSagas[id].Saga;
if (expectedVersion != null && trackedSaga.Version != expectedVersion)
throw new ConcurrencyException(trackedSaga.Id);
throw new ConcurrencyException(trackedSaga.Id, expectedVersion.Value, trackedSaga.Version);
return trackedSaga;
}

Expand All @@ -114,7 +114,7 @@ async Task<TSaga> GetAsync
#endif
<TSaga>(id);
if (expectedVersion != null && saga.Version != expectedVersion)
throw new ConcurrencyException(id);
throw new ConcurrencyException(id, expectedVersion.Value, saga.Version);
#if NET40
Add
#else
Expand Down
6 changes: 3 additions & 3 deletions Framework/Cqrs/Domain/UnitOfWork.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ async Task AddAsync
};
TrackedAggregates.Add(aggregate.Id, aggregateDescriptor);
}
else if (((TrackedAggregates[aggregate.Id]).Aggregate) != (IAggregateRoot<TAuthenticationToken>)aggregate)
else if (TrackedAggregates[aggregate.Id].Aggregate != (IAggregateRoot<TAuthenticationToken>)aggregate)
throw new ConcurrencyException(aggregate.Id);
#if NET40
#else
Expand All @@ -103,7 +103,7 @@ async Task<TAggregateRoot> GetAsync
{
var trackedAggregate = (TAggregateRoot)TrackedAggregates[id].Aggregate;
if (expectedVersion != null && trackedAggregate.Version != expectedVersion)
throw new ConcurrencyException(trackedAggregate.Id);
throw new ConcurrencyException(trackedAggregate.Id, expectedVersion.Value, trackedAggregate.Version);
return trackedAggregate;
}

Expand All @@ -115,7 +115,7 @@ async Task<TAggregateRoot> GetAsync
#endif
<TAggregateRoot>(id);
if (expectedVersion != null && aggregate.Version != expectedVersion)
throw new ConcurrencyException(id);
throw new ConcurrencyException(id, expectedVersion.Value, aggregate.Version);
#if NET40
Add
#else
Expand Down

0 comments on commit bdbf8cc

Please sign in to comment.