Skip to content

Commit

Permalink
Updated consistency exercise
Browse files Browse the repository at this point in the history
  • Loading branch information
oskardudycz committed Sep 6, 2024
1 parent 703bdbe commit cf425a4
Show file tree
Hide file tree
Showing 10 changed files with 58 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="Polly" Version="8.4.1"/>
<PackageReference Include="xunit" Version="2.9.0"/>
<PackageReference Include="xunit.runner.visualstudio" Version="2.8.2">
<PrivateAssets>all</PrivateAssets>
Expand Down
14 changes: 12 additions & 2 deletions Workshops/EventDrivenArchitecture/05-Consistency/Core/Database.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public ValueTask Store<T>(Guid id, T obj, CancellationToken _) where T : class
try
{
if (DateTimeOffset.Now.Ticks % 5 == 0)
throw new InvalidOperationException("Database not available!");
throw new TimeoutException("Database not available!");

storage[GetId<T>(id)] = obj;

Expand All @@ -30,7 +30,7 @@ public ValueTask Delete<T>(Guid id, CancellationToken _)
try
{
if (DateTimeOffset.Now.Ticks % 5 == 0)
throw new InvalidOperationException("Database not available!");
throw new TimeoutException("Database not available!");

storage.Remove(GetId<T>(id));

Expand All @@ -51,6 +51,16 @@ public ValueTask Delete<T>(Guid id, CancellationToken _)
: null
);

public ValueTask<T[]> Filter<T>(Func<T, bool> predicate, CancellationToken _) where T : class =>
ValueTask.FromResult(
storage.Values
.OfType<T>()
.Where(predicate)
.Select(record =>
JsonSerializer.Deserialize<T>(JsonSerializer.Serialize(record))!
).ToArray()
);

public async ValueTask Transaction(Func<Database, ValueTask> action)
{
Monitor.Enter(storage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ public class EventBus
public async ValueTask Publish(object[] events, CancellationToken ct)
{
if (DateTimeOffset.Now.Ticks % 5 == 0)
throw new InvalidOperationException("Database not available!");
throw new TimeoutException("Database not available!");

foreach (var @event in events)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
namespace Consistency.Core;

public class TimeoutException(string message): Exception(message);

Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="Polly" Version="8.4.1" />
<PackageReference Include="xunit" Version="2.9.0"/>
<PackageReference Include="xunit.runner.visualstudio" Version="2.8.2">
<PrivateAssets>all</PrivateAssets>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public ValueTask Store<T>(Guid id, T obj, CancellationToken _) where T : class
try
{
if (DateTimeOffset.Now.Ticks % 5 == 0)
throw new InvalidOperationException("Database not available!");
throw new TimeoutException("Database not available!");

storage[GetId<T>(id)] = obj;

Expand All @@ -30,7 +30,7 @@ public ValueTask Delete<T>(Guid id, CancellationToken _)
try
{
if (DateTimeOffset.Now.Ticks % 5 == 0)
throw new InvalidOperationException("Database not available!");
throw new TimeoutException("Database not available!");

storage.Remove(GetId<T>(id));

Expand All @@ -51,6 +51,16 @@ public ValueTask Delete<T>(Guid id, CancellationToken _)
: null
);

public ValueTask<T[]> Filter<T>(Func<T, bool> predicate, CancellationToken _) where T : class =>
ValueTask.FromResult(
storage.Values
.OfType<T>()
.Where(predicate)
.Select(record =>
JsonSerializer.Deserialize<T>(JsonSerializer.Serialize(record))!
).ToArray()
);

public async ValueTask Transaction(Func<Database, ValueTask> action)
{
Monitor.Enter(storage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ public class EventBus
public async ValueTask Publish(object[] events, CancellationToken ct)
{
if (DateTimeOffset.Now.Ticks % 5 == 0)
throw new InvalidOperationException("Database not available!");
throw new TimeoutException("Database not available!");

foreach (var @event in events)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
namespace Consistency.Core;

public class TimeoutException(string message): Exception(message);
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using Polly;
using Polly.Retry;

namespace Consistency.Sagas.Version1_Aggregates.Core;

public class Retry
{
public static int RetriesCount;

private static readonly AsyncRetryPolicy retryPolicy = Policy
.Handle<TimeoutException>()
.WaitAndRetryForeverAsync(
(_, _) =>
{
RetriesCount++;

return TimeSpan.FromMilliseconds(1);
});

public Task UntilSucceeds(Func<CancellationToken, ValueTask> handle, CancellationToken token = default) =>
retryPolicy.ExecuteAsync(async ct => await handle(ct), token);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public async ValueTask InitiateGroupCheckout(InitiateGroupCheckout command, Canc
command.Now
);


await database.Store(command.GroupCheckoutId, groupCheckout, ct);
await eventBus.Publish(groupCheckout.DequeueUncommittedEvents(), ct);
}
Expand Down

0 comments on commit cf425a4

Please sign in to comment.