Skip to content

Commit

Permalink
1.0.4
Browse files Browse the repository at this point in the history
  • Loading branch information
dariogriffo committed May 3, 2024
1 parent 31ab024 commit 1cc701e
Show file tree
Hide file tree
Showing 29 changed files with 801 additions and 289 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

All notable changes to this project will be documented in this file.

## [1.0.4](https://github.com/dariogriffo/marty-net/blob/main/CHANGELOG.md)

Improve logging


## [1.0.3](https://github.com/dariogriffo/marty-net/blob/main/CHANGELOG.md)

Improve pipeline execution.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
<PackageRequireLicenseAcceptance>false</PackageRequireLicenseAcceptance>
<RepositoryType>git</RepositoryType>
<RepositoryUrl>https://github.com/dariogriffo/marty-net</RepositoryUrl>
<Version>1.0.3</Version>
<Version>1.0.4</Version>
<SymbolPackageFormat>snupkg</SymbolPackageFormat>
<!-- SourceLink settings -->
<PublishRepositoryUrl>true</PublishRepositoryUrl>
Expand Down
117 changes: 37 additions & 80 deletions src/Marty.Net.Aggregates/Internal/AggregatesStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ internal sealed class AggregatesStore : IAggregateStore
public AggregatesStore(
IEventStore eventStore,
IAggregateStreamResolver streamNameResolver,
ILogger<AggregatesStore> logger
ILoggerFactory? loggerFactory
)
{
_eventStore = eventStore;
_streamNameResolver = streamNameResolver;
_logger = logger;
_logger = loggerFactory.CreateLoggerFor<AggregatesStore>();
}

public async Task Create<T>(T aggregate, CancellationToken cancellationToken = default)
Expand Down Expand Up @@ -58,12 +58,15 @@ public async Task<T> Hydrate<T>(T aggregate, CancellationToken cancellationToken
);
}

string id = aggregate.Id;
string streamName = _streamNameResolver.StreamForAggregate(aggregate);
_logger.LogTrace("Loading aggregate with id {Id} from stream {StreamName}", id, streamName);

_logger.LogLoadingAggregate(streamName);

List<IEvent> data = await _eventStore.ReadStream(streamName, cancellationToken);
aggregate.LoadFromHistory(data);
_logger.LogTrace("Aggregate with id {Id} loaded", id);

_logger.LogAggregateLoaded(streamName);

return aggregate;
}

Expand All @@ -76,24 +79,14 @@ public async Task<T> HydrateUntilPosition<T>(
{
string id = aggregate.Id;
string streamName = _streamNameResolver.StreamForAggregate(aggregate);
_logger.LogTrace(
"Loading aggregate with id {Id} from stream {StreamName} until position {StreamPosition}",
id,
streamName,
position
);
_logger.LogLoadingAggregateUntilPosition(streamName, position);
List<IEvent> data = await _eventStore.ReadStreamUntilPosition(
streamName,
position,
cancellationToken
);
aggregate.LoadFromHistory(data);
_logger.LogTrace(
"Aggregate with id {Id} loaded from stream {StreamName} until position {StreamPosition}",
id,
streamName,
position
);
_logger.LogAggregateLoadedUntilPosition(id, streamName, position);
return aggregate;
}

Expand All @@ -104,27 +97,16 @@ public async Task<T> HydrateFromPosition<T>(
)
where T : Aggregate
{
string id = aggregate.Id;
string streamName = _streamNameResolver.StreamForAggregate(aggregate);
_logger.LogTrace(
"Loading aggregate with id {Id} from stream {StreamName} from position {StreamPosition}",
id,
streamName,
position
);
_logger.LogLoadingAggregateFromPosition(streamName, position);
List<IEvent> data = await _eventStore.ReadStreamFromPosition(
streamName,
position,
cancellationToken
);
aggregate.Version = position - 1;
aggregate.LoadFromHistory(data);
_logger.LogTrace(
"Aggregate with id {Id} loaded from stream {StreamName} from position {StreamPosition}",
id,
streamName,
position
);
_logger.LogAggregateLoadedFromPosition(streamName, position);
return aggregate;
}

Expand All @@ -135,26 +117,19 @@ public async Task<T> HydrateFromTimestamp<T>(
)
where T : Aggregate
{
string id = aggregate.Id;
string streamName = _streamNameResolver.StreamForAggregate(aggregate);
_logger.LogTrace(
"Loading aggregate with id {Id} from stream {StreamName} from timestamp {Timestamp}",
id,
streamName,
timestamp
);

_logger.LogLoadingAggregateFromTimestamp(streamName, timestamp);

List<IEvent> data = await _eventStore.ReadStreamFromTimestamp(
streamName,
timestamp,
cancellationToken
);
aggregate.LoadFromHistory(data);
_logger.LogTrace(
"Aggregate with id {Id} loaded from stream {StreamName} from timestamp {Timestamp}",
id,
streamName,
timestamp
);

_logger.LogAggregateLoadedFromTimestamp(streamName, timestamp);

return aggregate;
}

Expand All @@ -165,26 +140,18 @@ public async Task<T> HydrateUntilTimestamp<T>(
)
where T : Aggregate
{
string id = aggregate.Id;
string streamName = _streamNameResolver.StreamForAggregate(aggregate);
_logger.LogTrace(
"Loading aggregate with id {Id} from stream {StreamName} from timestamp {Timestamp}",
id,
streamName,
timestamp
);

_logger.LogLoadingAggregateUntilTimestamp(streamName, timestamp);

List<IEvent> data = await _eventStore.ReadStreamUntilTimestamp(
streamName,
timestamp,
cancellationToken
);
aggregate.LoadFromHistory(data);
_logger.LogTrace(
"Aggregate with id {Id} loaded from stream {StreamName} from timestamp {Timestamp}",
id,
streamName,
timestamp
);

_logger.LogAggregateLoadedUntilTimestamp(streamName, timestamp);
return aggregate;
}

Expand All @@ -204,11 +171,11 @@ public async Task<T> GetAggregateById<T>(
where T : Aggregate, new()
{
string streamName = _streamNameResolver.StreamForAggregate<T>(id);
_logger.LogTrace("Loading aggregate with id {Id} from stream {StreamName}", id, streamName);
_logger.LogLoadingAggregate(streamName);
List<IEvent> data = await _eventStore.ReadStream(streamName, cancellationToken);
T aggregate = new();
aggregate.LoadFromHistory(data);
_logger.LogTrace("Aggregate with id {Id} loaded", id);
_logger.LogAggregateLoaded(id);
return aggregate;
}

Expand All @@ -219,12 +186,13 @@ public async Task<T> GetAggregateFromStream<T>(
where T : Aggregate, new()
{
string id = _streamNameResolver.AggregateIdForStream(streamName);
_logger.LogTrace("Loading aggregate with id {Id} from stream {StreamName}", id, streamName);
_logger.LogLoadingAggregate(streamName);
List<IEvent> data = await _eventStore.ReadStream(streamName, cancellationToken);
T aggregate = new();

aggregate.LoadFromHistory(data);
_logger.LogTrace("Aggregate with id {Id} loaded", streamName);
_logger.LogAggregateLoaded(id);

return aggregate;
}

Expand All @@ -235,19 +203,17 @@ public async Task<T> GetAggregateFromStreamUntilPosition<T>(
)
where T : Aggregate, new()
{
_logger.LogTrace("Loading aggregate from stream {StreamName}", streamName);
_logger.LogLoadingAggregate(streamName);
List<IEvent> history = await _eventStore.ReadStreamUntilPosition(
streamName,
position,
cancellationToken
);
T aggregate = new();
aggregate.LoadFromHistory(history);
_logger.LogTrace(
"Aggregate with id {Id} loaded from stream {StreamName}",
aggregate.Id,
streamName
);

_logger.LogAggregateLoaded(aggregate.Id);

return aggregate;
}

Expand All @@ -258,24 +224,16 @@ public async Task<T> GetAggregateFromStreamFromPosition<T>(
)
where T : Aggregate, new()
{
_logger.LogTrace(
"Loading aggregate from stream {StreamName} from position {StreamPosition}",
streamName,
position
);
_logger.LogLoadingAggregateFromPosition(streamName, position);

List<IEvent> data = await _eventStore.ReadStreamFromPosition(
streamName,
position,
cancellationToken
);
T aggregate = new();
aggregate.LoadFromHistory(data);
_logger.LogTrace(
"Aggregate with id {Id} loaded from stream {StreamName} from position {StreamPosition}",
aggregate.Id,
streamName,
position
);
_logger.LogAggregateLoadedFromPosition(streamName, position);
return aggregate;
}

Expand All @@ -286,14 +244,13 @@ public async Task<T> GetAggregateFromStream<T>(
)
where T : Aggregate, new()
{
string id = _streamNameResolver.AggregateIdForStream(streamName);
_logger.LogTrace("Loading aggregate with id {Id} from stream {StreamName}", id, streamName);
_logger.LogLoadingAggregate(streamName);
List<IEvent> data = await _eventStore.ReadStream(streamName, cancellationToken);
T aggregate = new();
IEnumerable<IEvent> history = data.TakeWhile(x => x.Timestamp < lastEventToLoad.Timestamp);

aggregate.LoadFromHistory(history);
_logger.LogTrace("Aggregate with id {Id} loaded", streamName);
_logger.LogAggregateLoaded(streamName);
return aggregate;
}
}
Loading

0 comments on commit 1cc701e

Please sign in to comment.