forked from oskardudycz/EventSourcing.NetCore
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathEventStoreDBSubscriptionCheckpointRepository.cs
74 lines (61 loc) · 2.57 KB
/
EventStoreDBSubscriptionCheckpointRepository.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
using Core.EventStoreDB.Serialization;
using EventStore.Client;
namespace Core.EventStoreDB.Subscriptions;
public record CheckpointStored(string SubscriptionId, ulong? Position, DateTime CheckpointedAt);
public class EventStoreDBSubscriptionCheckpointRepository: ISubscriptionCheckpointRepository
{
private readonly EventStoreClient eventStoreClient;
public EventStoreDBSubscriptionCheckpointRepository(
EventStoreClient eventStoreClient)
{
this.eventStoreClient = eventStoreClient ?? throw new ArgumentNullException(nameof(eventStoreClient));
}
public async ValueTask<ulong?> Load(string subscriptionId, CancellationToken ct)
{
var streamName = GetCheckpointStreamName(subscriptionId);
var result = eventStoreClient.ReadStreamAsync(Direction.Backwards, streamName, StreamPosition.End, 1,
cancellationToken: ct);
if (await result.ReadState == ReadState.StreamNotFound)
{
return null;
}
ResolvedEvent? @event = await result.FirstOrDefaultAsync(ct);
return @event?.Deserialize<CheckpointStored>()?.Position;
}
public async ValueTask Store(string subscriptionId, ulong position, CancellationToken ct)
{
var @event = new CheckpointStored(subscriptionId, position, DateTime.UtcNow);
var eventToAppend = new[] {@event.ToJsonEventData()};
var streamName = GetCheckpointStreamName(subscriptionId);
try
{
// store new checkpoint expecting stream to exist
await eventStoreClient.AppendToStreamAsync(
streamName,
StreamState.StreamExists,
eventToAppend,
cancellationToken: ct
);
}
catch (WrongExpectedVersionException)
{
// WrongExpectedVersionException means that stream did not exist
// Set the checkpoint stream to have at most 1 event
// using stream metadata $maxCount property
await eventStoreClient.SetStreamMetadataAsync(
streamName,
StreamState.NoStream,
new StreamMetadata(1),
cancellationToken: ct
);
// append event again expecting stream to not exist
await eventStoreClient.AppendToStreamAsync(
streamName,
StreamState.NoStream,
eventToAppend,
cancellationToken: ct
);
}
}
private static string GetCheckpointStreamName(string subscriptionId) => $"checkpoint_{subscriptionId}";
}