Skip to content

Commit

Permalink
Merge pull request #61 from Cratis:dolittle-event-stream-formalzation
Browse files Browse the repository at this point in the history
Extracted out EventStream queries
  • Loading branch information
einari authored Nov 11, 2021
2 parents f63b2df + d9b5b98 commit 5838a42
Show file tree
Hide file tree
Showing 9 changed files with 206 additions and 41 deletions.
48 changes: 48 additions & 0 deletions Source/Extensions/Dolittle/EventStore/EventStore.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright (c) Cratis. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using Cratis.Extensions.MongoDB;
using Microsoft.Extensions.Logging;
using MongoDB.Driver;

namespace Cratis.Extensions.Dolittle.EventStore
{
/// <summary>
/// Represents an implementation of <see cref="IEventStore"/>.
/// </summary>
public class EventStore : IEventStore
{
readonly IMongoClient _client;
readonly IMongoDatabase _database;
readonly ILoggerFactory _loggerFactory;

/// <summary>
/// Initializes a new instance of the <see cref="EventStore"/> class.
/// </summary>
/// <param name="mongoDBClientFactory"><see cref="IMongoDBClientFactory"/> for connecting to MongoDB.</param>
/// <param name="loggerFactory"><see cref="ILoggerFactory"/> for creating loggers.</param>
public EventStore(IMongoDBClientFactory mongoDBClientFactory, ILoggerFactory loggerFactory)
{
var mongoUrlBuilder = new MongoUrlBuilder
{
Servers = new[] { new MongoServerAddress("localhost", 27017) }
};
var url = mongoUrlBuilder.ToMongoUrl();
var settings = MongoClientSettings.FromUrl(url);
_client = mongoDBClientFactory.Create(settings);
_database = _client.GetDatabase("event_store");
_loggerFactory = loggerFactory;
}

/// <inheritdoc/>
public IEventStream GetStream(EventStreamId id)
{
if (id.Equals(EventStreamId.EventLog))
{
return new EventStream(_database.GetCollection<Event>("event-log"), _loggerFactory.CreateLogger<EventStream>());
}

throw new NotImplementedException("Event log is the only stream supported at this point");
}
}
}
58 changes: 58 additions & 0 deletions Source/Extensions/Dolittle/EventStore/EventStream.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright (c) Cratis. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using Dolittle.SDK.Events;
using Microsoft.Extensions.Logging;
using MongoDB.Driver;

namespace Cratis.Extensions.Dolittle.EventStore
{
/// <summary>
/// Represents an implementation of <see cref="IEventStream"/>.
/// </summary>
public class EventStream : IEventStream
{
readonly IMongoCollection<Event> _collection;
readonly ILogger<EventStream> _logger;

/// <summary>
/// Initializes a new instance of the <see cref="EventStream"/> class.
/// </summary>
/// <param name="collection"><see cref="IMongoCollection{T}"/> that holds the event stream.</param>
/// <param name="logger"><see cref="ILogger{T}"/> for logging.</param>
public EventStream(IMongoCollection<Event> collection, ILogger<EventStream> logger)
{
_collection = collection;
_logger = logger;
}

/// <inheritdoc/>
public IChangeStreamCursor<ChangeStreamDocument<Event>> Watch() => _collection.Watch(options: new() { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup });

/// <inheritdoc/>
public long Count() => _collection.CountDocuments(FilterDefinition<Event>.Empty);

/// <inheritdoc/>
public async Task<IAsyncCursor<Event>> GetFromPosition(uint position, IEnumerable<EventType>? eventTypes = default, EventSourceId? eventSourceId = default)
{
var offsetFilter = Builders<Event>.Filter.Gt(_ => _.Id, position);
var eventTypeFilters = eventTypes?.Select(_ => Builders<Event>.Filter.Eq(_ => _.Metadata.TypeId, _.Id.Value)).ToArray() ?? Array.Empty<FilterDefinition<Event>>();
var eventSourceFilter = (eventSourceId is null) ? FilterDefinition<Event>.Empty : Builders<Event>.Filter.Eq(_ => _.Metadata.EventSource, eventSourceId.Value);

var filter = Builders<Event>.Filter.And(
offsetFilter,
eventSourceFilter,
Builders<Event>.Filter.Or(eventTypeFilters)
);

_logger.GettingEventsFromOffset(position);

return await _collection.FindAsync(
filter,
new()
{
Sort = Builders<Event>.Sort.Ascending(_ => _.Id)
});
}
}
}
12 changes: 12 additions & 0 deletions Source/Extensions/Dolittle/EventStore/EventStreamId.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright (c) Cratis. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using Cratis.Concepts;

namespace Cratis.Extensions.Dolittle.EventStore
{
public record EventStreamId(Guid Value) : ConceptAs<Guid>(Value)
{
public static readonly EventStreamId EventLog = new(Guid.Empty);
}
}
16 changes: 16 additions & 0 deletions Source/Extensions/Dolittle/EventStore/EventStreamLogMessages.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright (c) Cratis. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using Microsoft.Extensions.Logging;

namespace Cratis.Extensions.Dolittle.EventStore
{
/// <summary>
/// Holds log messages for <see cref="EventStream"/>.
/// </summary>
public static partial class EventStreamLogMessages
{
[LoggerMessage(0, LogLevel.Trace, "Getting events from offset {Offset}")]
internal static partial void GettingEventsFromOffset(this ILogger logger, uint offset);
}
}
18 changes: 18 additions & 0 deletions Source/Extensions/Dolittle/EventStore/IEventStore.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) Cratis. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace Cratis.Extensions.Dolittle.EventStore
{
/// <summary>
/// Defines a system for working with the Dolittle event store.
/// </summary>
public interface IEventStore
{
/// <summary>
/// Get a specific <see cref="IEventStream"/>.
/// </summary>
/// <param name="id"><see cref="EventStreamId"/>.</param>
/// <returns><see cref="IEventStream"/>.</returns>
IEventStream GetStream(EventStreamId id);
}
}
35 changes: 35 additions & 0 deletions Source/Extensions/Dolittle/EventStore/IEventStream.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright (c) Cratis. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using Dolittle.SDK.Events;
using MongoDB.Driver;

namespace Cratis.Extensions.Dolittle.EventStore
{
/// <summary>
/// Defines an API for working with a Dolittle event stream.
/// </summary>
public interface IEventStream
{
/// <summary>
/// Count number of events in the stream.
/// </summary>
/// <returns>The count of events.</returns>
long Count();

/// <summary>
/// Watch for changes on the event stream.
/// </summary>
/// <returns><see cref="IChangeStreamCursor{T}"/> for changes.</returns>
IChangeStreamCursor<ChangeStreamDocument<Event>> Watch();

/// <summary>
/// Get events from a specific position.
/// </summary>
/// <param name="position">Position to get from.</param>
/// <param name="eventTypes">Optional event types to get. If not specified, all will be given.</param>
/// <param name="eventSourceId">Optional <see cref="EventSourceId"/>.</param>
/// <returns><see cref="IAsyncCursor{Event}"/> for events in the stream.</returns>
Task<IAsyncCursor<Event>> GetFromPosition(uint position, IEnumerable<EventType>? eventTypes = default, EventSourceId? eventSourceId = default);
}
}
47 changes: 13 additions & 34 deletions Source/Extensions/Dolittle/Projections/ProjectionEventProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
using System.Dynamic;
using System.Reactive.Subjects;
using Cratis.Events.Projections;
using Cratis.Extensions.MongoDB;
using Microsoft.Extensions.Logging;
using MongoDB.Bson.Serialization;
using MongoDB.Driver;
using IEventStore = Cratis.Extensions.Dolittle.EventStore.IEventStore;
using IEventStream = Cratis.Extensions.Dolittle.EventStore.IEventStream;

namespace Cratis.Extensions.Dolittle.Projections
{
Expand All @@ -17,9 +18,8 @@ namespace Cratis.Extensions.Dolittle.Projections
/// </summary>
public class ProjectionEventProvider : IProjectionEventProvider
{
readonly IMongoClient _client;
readonly IMongoDatabase _database;
readonly IMongoCollection<EventStore.Event> _eventLogCollection;
readonly IEventStore _eventStore;
readonly IEventStream _eventStream;
readonly IProjectionPositions _projectionPositions;
readonly ILogger<ProjectionEventProvider> _logger;
readonly ConcurrentDictionary<IProjection, ReplaySubject<Event>> _projectionsWithSubject = new();
Expand All @@ -28,23 +28,16 @@ public class ProjectionEventProvider : IProjectionEventProvider
/// <summary>
/// Initializes a new instance of <see cref="ProjectionEventProvider"/>.
/// </summary>
/// <param name="mongoDBClientFactory"><see cref="IMongoDBClientFactory"/> for connecting to MongoDB.</param>
/// <param name="eventStore">The Dolittle <see cref="IEventStore"/>.</param>
/// <param name="projectionPositions"><see cref="IProjectionPositions"/> for maintaining positions.</param>
/// <param name="logger"><see cref="ILogger{T}"/> for logging.</param>
public ProjectionEventProvider(
IMongoDBClientFactory mongoDBClientFactory,
IEventStore eventStore,
IProjectionPositions projectionPositions,
ILogger<ProjectionEventProvider> logger)
{
var mongoUrlBuilder = new MongoUrlBuilder
{
Servers = new[] { new MongoServerAddress("localhost", 27017) }
};
var url = mongoUrlBuilder.ToMongoUrl();
var settings = MongoClientSettings.FromUrl(url);
_client = mongoDBClientFactory.Create(settings);
_database = _client.GetDatabase("event_store");
_eventLogCollection = _database.GetCollection<EventStore.Event>("event-log");
_eventStore = eventStore;
_eventStream = eventStore.GetStream(EventStore.EventStreamId.EventLog);
_projectionPositions = projectionPositions;
_logger = logger;
}
Expand Down Expand Up @@ -94,7 +87,7 @@ void WatchForEvents()
{
Task.Run(async () =>
{
var cursor = _eventLogCollection.Watch(options: new() { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup });
var cursor = _eventStream.Watch();
while (cursor.MoveNext())
{
if (!cursor.Current.Any()) continue;
Expand All @@ -119,7 +112,7 @@ async Task StartProvidingFor(IProjection projection, ReplaySubject<Event> subjec
}

await CatchUp(projection, subject);
var tail = _eventLogCollection.CountDocuments(FilterDefinition<EventStore.Event>.Empty);
var tail = _eventStream.Count();
await _projectionPositions.Save(projection, (uint)tail);
_projectionsWithSubject.TryAdd(projection, subject);
WatchForEvents();
Expand All @@ -134,27 +127,13 @@ async Task CatchUp(IProjection projection, ReplaySubject<Event> subject)
{
_logger.CatchingUp(projection.Identifier);
var offset = await _projectionPositions.GetFor(projection);
var eventTypeFilters = projection.EventTypes.Select(_ => Builders<EventStore.Event>.Filter.Eq(_ => _.Metadata.TypeId, Guid.Parse(_.Value))).ToArray();
var eventTypes = projection.EventTypes.Select(_ => new global::Dolittle.SDK.Events.EventType(Guid.Parse(_.Value))).ToArray();

var exhausted = false;

while (!exhausted)
{
var offsetFilter = Builders<EventStore.Event>.Filter.Gt(_ => _.Id, offset.Value);
var filter = Builders<EventStore.Event>.Filter.And(
offsetFilter,
Builders<EventStore.Event>.Filter.Or(eventTypeFilters)
);

_logger.GettingEventsFromOffset(offset.Value);

var cursor = await _eventLogCollection.FindAsync(
filter,
new()
{
Sort = Builders<EventStore.Event>.Sort.Ascending(_ => _.Id)
});

var cursor = await _eventStream.GetFromPosition(offset.Value, eventTypes);
while (await cursor.MoveNextAsync())
{
if (!cursor.Current.Any())
Expand Down Expand Up @@ -183,7 +162,7 @@ async Task<EventLogSequenceNumber> OnNext(IProjection projection, ReplaySubject<
@event.Id,
eventType,
@event.Metadata.Occurred,
@event.Metadata.EventSource.ToString(),
@event.Metadata.EventSource,
content));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,13 @@ public static partial class ProjectionEventProviderLogMessages
[LoggerMessage(4, LogLevel.Debug, "Catching up projection '{Projection}'")]
internal static partial void CatchingUp(this ILogger logger, ProjectionId projection);

[LoggerMessage(5, LogLevel.Trace, "Getting events from offset {Offset}")]
internal static partial void GettingEventsFromOffset(this ILogger logger, uint offset);

[LoggerMessage(6, LogLevel.Trace, "Providing event with sequence number {SequenceNumber}")]
[LoggerMessage(5, LogLevel.Trace, "Providing event with sequence number {SequenceNumber}")]
internal static partial void ProvidingEvent(this ILogger logger, uint sequenceNumber);

[LoggerMessage(7, LogLevel.Information, "Projection '{Projection}' is not interested in any event types, skipping catch up.")]
[LoggerMessage(6, LogLevel.Information, "Projection '{Projection}' is not interested in any event types, skipping catch up.")]
internal static partial void SkippingProvidingForProjectionDueToNoEventTypes(this ILogger logger, ProjectionId projection);

[LoggerMessage(8, LogLevel.Error, "Error during starting projection '{Projection}'")]
[LoggerMessage(7, LogLevel.Error, "Error during starting projection '{Projection}'")]
internal static partial void ErrorStartingProviding(this ILogger logger, ProjectionId projection, Exception exception);
}
}
4 changes: 3 additions & 1 deletion Source/Extensions/Dolittle/Projections/Projections.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ void ActualStartAll()

var projection = _projectionSerializer.CreateFrom(parsed);
var projectionPositions = new ProjectionPositions(_mongoDBClientFactory);
var provider = new ProjectionEventProvider(_mongoDBClientFactory, projectionPositions, _loggerFactory.CreateLogger<ProjectionEventProvider>());

var eventStore = new EventStore.EventStore(_mongoDBClientFactory, _loggerFactory);
var provider = new ProjectionEventProvider(eventStore, projectionPositions, _loggerFactory.CreateLogger<ProjectionEventProvider>());
var changesetStorage = new MongoDBChangesetStorage(_mongoDBClientFactory);
var pipeline = new ProjectionPipeline(provider, projection, changesetStorage, _loggerFactory.CreateLogger<ProjectionPipeline>());
//var storage = new InMemoryProjectionStorage();
Expand Down

0 comments on commit 5838a42

Please sign in to comment.