Skip to content

Commit 4139ebf

Browse files
authoredApr 14, 2023
pref: issues 557 (#558)
* chore: Adjust cache validity period * pref: Optimize integration events * chore: Processing Topic * rename: IntegrationEventId -> EventId * chore: Remove code smell
1 parent a208967 commit 4139ebf

File tree

21 files changed

+116
-139
lines changed

21 files changed

+116
-139
lines changed
 

‎src/BuildingBlocks/Ddd/Domain/Masa.BuildingBlocks.Ddd.Domain/Events/DomainCommand.cs

+8-13
Original file line numberDiff line numberDiff line change
@@ -3,28 +3,23 @@
33

44
namespace Masa.BuildingBlocks.Ddd.Domain.Events;
55

6-
public abstract record DomainCommand : IDomainCommand
6+
public abstract record DomainCommand(Guid EventId, DateTime EvenCreateTime) : IDomainCommand
77
{
8-
private Guid _eventId;
9-
private DateTime _creationTime;
8+
[JsonInclude] public Guid EventId { private get; set; } = EventId;
9+
10+
[JsonInclude] public DateTime EvenCreateTime { private get; set; } = EvenCreateTime;
1011

1112
[NotMapped]
1213
[JsonIgnore]
1314
public IUnitOfWork? UnitOfWork { get; set; }
1415

1516
protected DomainCommand() : this(Guid.NewGuid(), DateTime.UtcNow) { }
1617

17-
protected DomainCommand(Guid eventId, DateTime creationTime)
18-
{
19-
_eventId = eventId;
20-
_creationTime = creationTime;
21-
}
22-
23-
public Guid GetEventId() => _eventId;
18+
public Guid GetEventId() => EventId;
2419

25-
public void SetEventId(Guid eventId) => _eventId = eventId;
20+
public void SetEventId(Guid eventId) => EventId = eventId;
2621

27-
public DateTime GetCreationTime() => _creationTime;
22+
public DateTime GetCreationTime() => EvenCreateTime;
2823

29-
public void SetCreationTime(DateTime creationTime) => _creationTime = creationTime;
24+
public void SetCreationTime(DateTime creationTime) => EvenCreateTime = creationTime;
3025
}

‎src/BuildingBlocks/Ddd/Domain/Masa.BuildingBlocks.Ddd.Domain/Events/DomainEvent.cs

+8-13
Original file line numberDiff line numberDiff line change
@@ -3,28 +3,23 @@
33

44
namespace Masa.BuildingBlocks.Ddd.Domain.Events;
55

6-
public abstract record DomainEvent : IDomainEvent
6+
public abstract record DomainEvent(Guid EventId, DateTime EvenCreateTime) : IDomainEvent
77
{
8-
private Guid _eventId;
9-
private DateTime _creationTime;
8+
[JsonInclude] public Guid EventId { private get; set; } = EventId;
9+
10+
[JsonInclude] public DateTime EvenCreateTime { private get; set; } = EvenCreateTime;
1011

1112
[NotMapped]
1213
[JsonIgnore]
1314
public IUnitOfWork? UnitOfWork { get; set; }
1415

1516
protected DomainEvent() : this(Guid.NewGuid(), DateTime.UtcNow) { }
1617

17-
protected DomainEvent(Guid eventId, DateTime creationTime)
18-
{
19-
_eventId = eventId;
20-
_creationTime = creationTime;
21-
}
22-
23-
public Guid GetEventId() => _eventId;
18+
public Guid GetEventId() => EventId;
2419

25-
public void SetEventId(Guid eventId) => _eventId = eventId;
20+
public void SetEventId(Guid eventId) => EventId = eventId;
2621

27-
public DateTime GetCreationTime() => _creationTime;
22+
public DateTime GetCreationTime() => EvenCreateTime;
2823

29-
public void SetCreationTime(DateTime creationTime) => _creationTime = creationTime;
24+
public void SetCreationTime(DateTime creationTime) => EvenCreateTime = creationTime;
3025
}

‎src/BuildingBlocks/Ddd/Domain/Masa.BuildingBlocks.Ddd.Domain/Events/DomainQuery.cs

+8-13
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,11 @@
33

44
namespace Masa.BuildingBlocks.Ddd.Domain.Events;
55

6-
public abstract record DomainQuery<TResult> : IDomainQuery<TResult>
6+
public abstract record DomainQuery<TResult>(Guid EventId, DateTime EvenCreateTime) : IDomainQuery<TResult>
77
{
8-
private Guid _eventId;
9-
private DateTime _creationTime;
8+
[JsonInclude] public Guid EventId { private get; set; } = EventId;
9+
10+
[JsonInclude] public DateTime EvenCreateTime { private get; set; } = EvenCreateTime;
1011

1112
[JsonIgnore]
1213
public IUnitOfWork? UnitOfWork
@@ -19,17 +20,11 @@ public IUnitOfWork? UnitOfWork
1920

2021
protected DomainQuery() : this(Guid.NewGuid(), DateTime.UtcNow) { }
2122

22-
protected DomainQuery(Guid eventId, DateTime creationTime)
23-
{
24-
_eventId = eventId;
25-
_creationTime = creationTime;
26-
}
27-
28-
public Guid GetEventId() => _eventId;
23+
public Guid GetEventId() => EventId;
2924

30-
public void SetEventId(Guid eventId) => _eventId = eventId;
25+
public void SetEventId(Guid eventId) => EventId = eventId;
3126

32-
public DateTime GetCreationTime() => _creationTime;
27+
public DateTime GetCreationTime() => EvenCreateTime;
3328

34-
public void SetCreationTime(DateTime creationTime) => _creationTime = creationTime;
29+
public void SetCreationTime(DateTime creationTime) => EvenCreateTime = creationTime;
3530
}

‎src/BuildingBlocks/Ddd/Domain/Masa.BuildingBlocks.Ddd.Domain/Events/IntegrationDomainEvent.cs

+1-2
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,8 @@
33

44
namespace Masa.BuildingBlocks.Ddd.Domain.Events;
55

6-
public abstract record IntegrationDomainEvent(Guid Id, DateTime CreationTime) : DomainEvent(Id, CreationTime), IIntegrationDomainEvent
6+
public abstract record IntegrationDomainEvent(Guid EventId, DateTime EvenCreateTime) : DomainEvent(EventId, EvenCreateTime), IIntegrationDomainEvent
77
{
8-
[JsonIgnore]
98
public virtual string Topic { get; set; }
109

1110
protected IntegrationDomainEvent() : this(Guid.NewGuid(), DateTime.UtcNow)

‎src/BuildingBlocks/Dispatcher/Masa.BuildingBlocks.Dispatcher.Events/Event.cs

+8-11
Original file line numberDiff line numberDiff line change
@@ -3,24 +3,21 @@
33

44
namespace Masa.BuildingBlocks.Dispatcher.Events;
55

6-
public abstract record Event : IEvent
6+
public abstract record Event(Guid EventId, DateTime EvenCreateTime) : IEvent
77
{
8-
private Guid _eventId;
9-
private DateTime _creationTime;
8+
[JsonInclude] public Guid EventId { private get; set; } = EventId;
109

11-
protected Event() : this(Guid.NewGuid(), DateTime.UtcNow) { }
10+
[JsonInclude] public DateTime EvenCreateTime { private get; set; } = EvenCreateTime;
1211

13-
protected Event(Guid eventId, DateTime creationTime)
12+
protected Event() : this(Guid.NewGuid(), DateTime.UtcNow)
1413
{
15-
_eventId = eventId;
16-
_creationTime = creationTime;
1714
}
1815

19-
public Guid GetEventId() => _eventId;
16+
public Guid GetEventId() => EventId;
2017

21-
public void SetEventId(Guid eventId) => _eventId = eventId;
18+
public void SetEventId(Guid eventId) => EventId = eventId;
2219

23-
public DateTime GetCreationTime() => _creationTime;
20+
public DateTime GetCreationTime() => EvenCreateTime;
2421

25-
public void SetCreationTime(DateTime creationTime) => _creationTime = creationTime;
22+
public void SetCreationTime(DateTime creationTime) => EvenCreateTime = creationTime;
2623
}

‎src/BuildingBlocks/Dispatcher/Masa.BuildingBlocks.Dispatcher.Events/_Imports.cs

+1
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@
33

44
global using Microsoft.Extensions.DependencyInjection;
55
global using System.Reflection;
6+
global using System.Text.Json.Serialization;

‎src/BuildingBlocks/Dispatcher/Masa.BuildingBlocks.Dispatcher.IntegrationEvents/IntegrationEvent.cs

+14-13
Original file line numberDiff line numberDiff line change
@@ -5,31 +5,32 @@ namespace Masa.BuildingBlocks.Dispatcher.IntegrationEvents;
55

66
public abstract record IntegrationEvent : IIntegrationEvent
77
{
8-
private Guid _eventId;
9-
private DateTime _creationTime;
8+
[JsonInclude]public Guid EventId { private get; set; }
109

11-
[NotMapped]
12-
[JsonIgnore]
13-
public IUnitOfWork? UnitOfWork { get; set; }
10+
[JsonInclude]
11+
public DateTime EvenCreateTime { private get; set; }
12+
13+
[NotMapped] [JsonIgnore] public IUnitOfWork? UnitOfWork { get; set; }
1414

15-
[JsonIgnore]
1615
public virtual string Topic { get; set; }
1716

18-
protected IntegrationEvent() : this(Guid.NewGuid(), DateTime.UtcNow) { }
17+
protected IntegrationEvent() : this(Guid.NewGuid(), DateTime.UtcNow)
18+
{
19+
}
1920

2021
protected IntegrationEvent(Guid eventId, DateTime creationTime)
2122
{
2223
if (string.IsNullOrWhiteSpace(Topic)) Topic = GetType().Name;
2324

24-
_eventId = eventId;
25-
_creationTime = creationTime;
25+
EventId = eventId;
26+
EvenCreateTime = creationTime;
2627
}
2728

28-
public Guid GetEventId() => _eventId;
29+
public Guid GetEventId() => EventId;
2930

30-
public void SetEventId(Guid eventId) => _eventId = eventId;
31+
public void SetEventId(Guid eventId) => EventId = eventId;
3132

32-
public DateTime GetCreationTime() => _creationTime;
33+
public DateTime GetCreationTime() => EvenCreateTime;
3334

34-
public void SetCreationTime(DateTime creationTime) => _creationTime = creationTime;
35+
public void SetCreationTime(DateTime creationTime) => EvenCreateTime = creationTime;
3536
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
// Copyright (c) MASA Stack All rights reserved.
2+
// Licensed under the MIT License. See LICENSE.txt in the project root for license information.
3+
4+
// ReSharper disable once CheckNamespace
5+
6+
namespace Masa.BuildingBlocks.Dispatcher.IntegrationEvents;
7+
8+
/// <summary>
9+
/// Only used to get Topic
10+
/// Avoid directly obtaining the value of the Topic because you are worried that the global JsonSerializerOptions will be modified, resulting in failure to obtain the Topic
11+
/// </summary>
12+
internal class IntegrationEventTopic : ITopic
13+
{
14+
public string Topic { get; set; }
15+
}

‎src/BuildingBlocks/Dispatcher/Masa.BuildingBlocks.Dispatcher.IntegrationEvents/Logs/IntegrationEventLog.cs

+14-7
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,12 @@ public class IntegrationEventLog : IHasConcurrencyStamp
1414
[NotMapped]
1515
public string EventTypeShortName => EventTypeName.Split('.').Last();
1616

17+
private object? _event;
18+
19+
[NotMapped] public object Event => _event ??= JsonSerializer.Deserialize<object>(Content)!;
20+
1721
[NotMapped]
18-
public IIntegrationEvent Event { get; private set; } = null!;
22+
public string Topic { get; private set; } = null!;
1923

2024
public IntegrationEventStates State { get; set; } = IntegrationEventStates.NotPublished;
2125

@@ -43,22 +47,25 @@ public IntegrationEventLog(IIntegrationEvent @event, Guid transactionId) : this(
4347
CreationTime = @event.GetCreationTime();
4448
ModificationTime = @event.GetCreationTime();
4549
EventTypeName = @event.GetType().FullName!;
46-
Content = System.Text.Json.JsonSerializer.Serialize((object)@event);
50+
Content = JsonSerializer.Serialize((object)@event);
4751
TransactionId = transactionId;
4852
}
4953

5054
public void Initialize()
5155
{
52-
this.CreationTime = this.GetCurrentTime();
56+
CreationTime = GetCurrentTime();
5357
}
5458

5559
public virtual DateTime GetCurrentTime() => DateTime.UtcNow;
5660

57-
public IntegrationEventLog DeserializeJsonContent(Type type)
61+
public IntegrationEventLog DeserializeJsonContent()
5862
{
59-
Event = (System.Text.Json.JsonSerializer.Deserialize(Content, type) as IIntegrationEvent)!;
60-
Event?.SetEventId(this.EventId);
61-
Event?.SetCreationTime(this.CreationTime);
63+
var json = JsonSerializer.Deserialize<IntegrationEventTopic>(Content);
64+
Topic = json!.Topic;
65+
if (Topic.IsNullOrWhiteSpace())
66+
{
67+
Topic = EventTypeShortName;//Used to handle when the Topic is not persisted, it is consistent with the class name by default
68+
}
6269
return this;
6370
}
6471

‎src/BuildingBlocks/Dispatcher/Masa.BuildingBlocks.Dispatcher.IntegrationEvents/_Imports.cs

+1-2
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,8 @@
44
global using Masa.BuildingBlocks.Data;
55
global using Masa.BuildingBlocks.Data.UoW;
66
global using Masa.BuildingBlocks.Dispatcher.Events;
7-
global using Masa.BuildingBlocks.Dispatcher.IntegrationEvents;
8-
global using Microsoft.Extensions.Options;
97
global using System.ComponentModel.DataAnnotations.Schema;
108
global using System.Data.Common;
119
global using System.Runtime.CompilerServices;
10+
global using System.Text.Json;
1211
global using System.Text.Json.Serialization;

‎src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents.Dapr/Publisher.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ public Publisher(IServiceProvider serviceProvider, string pubSubName)
1616
_pubSubName = pubSubName;
1717
}
1818

19-
public async Task PublishAsync<T>(string topicName, T @event, CancellationToken stoppingToken = default) where T : IIntegrationEvent
19+
public async Task PublishAsync<T>(string topicName, T @event, CancellationToken stoppingToken = default)
2020
{
21-
await DaprClient.PublishEventAsync<object>(_pubSubName, topicName, @event, stoppingToken);
21+
await DaprClient.PublishEventAsync(_pubSubName, topicName, @event, stoppingToken);
2222
}
2323
}

‎src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore/IntegrationEventLogService.cs

+2-5
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,13 @@ namespace Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore;
55

66
public class IntegrationEventLogService : IIntegrationEventLogService
77
{
8-
private readonly IEnumerable<Type> _integrationEventTypes;
98
private readonly IntegrationEventLogContext _eventLogContext;
109
private readonly ILogger<IntegrationEventLogService>? _logger;
1110

1211
public IntegrationEventLogService(
13-
IEnumerable<Type> integrationEventTypes,
1412
IntegrationEventLogContext eventLogContext,
1513
ILogger<IntegrationEventLogService>? logger = null)
1614
{
17-
_integrationEventTypes = integrationEventTypes;
1815
_eventLogContext = eventLogContext;
1916
_logger = logger;
2017
}
@@ -45,7 +42,7 @@ public async Task<IEnumerable<IntegrationEventLog>> RetrieveEventLogsFailedToPub
4542
if (result.Any())
4643
{
4744
return result.OrderBy(e => e.CreationTime)
48-
.Select(e => e.DeserializeJsonContent(_integrationEventTypes.First(t => t.Name == e.EventTypeShortName)));
45+
.Select(e => e.DeserializeJsonContent());
4946
}
5047

5148
return result;
@@ -69,7 +66,7 @@ public async Task<IEnumerable<IntegrationEventLog>> RetrieveEventLogsPendingToPu
6966
if (result.Any())
7067
{
7168
return result.OrderBy(e => e.CreationTime)
72-
.Select(e => e.DeserializeJsonContent(_integrationEventTypes.First(t => t.Name == e.EventTypeShortName)));
69+
.Select(e => e.DeserializeJsonContent());
7370
}
7471

7572
return result;

‎src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore/IntegrationEventOptionsExtensions.cs

-2
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,7 @@ public static IIntegrationEventOptions UseEventLog<TDbContext>(
3030
option.DbContextType = typeof(TDbContext);
3131
});
3232

33-
var integrationEventTypes = options.Assemblies.SelectMany(assembly => assembly.GetTypes()).Where(type => type.IsClass &&typeof(IIntegrationEvent).IsAssignableFrom(type)).Distinct();
3433
options.Services.TryAddScoped<IIntegrationEventLogService>(serviceProvider => new IntegrationEventLogService(
35-
integrationEventTypes,
3634
serviceProvider.GetRequiredService<IntegrationEventLogContext>(),
3735
serviceProvider.GetService<ILogger<IntegrationEventLogService>>()));
3836

‎src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents/IPublisher.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,5 @@ namespace Masa.Contrib.Dispatcher.IntegrationEvents;
55

66
public interface IPublisher
77
{
8-
Task PublishAsync<T>(string topicName, T @event, CancellationToken stoppingToken = default) where T : IIntegrationEvent;
8+
Task PublishAsync<T>(string topicName, T @event, CancellationToken stoppingToken = default);
99
}

‎src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents/Internal/IntegrationEventLogItem.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@ internal class IntegrationEventLogItem
1818

1919
public int RetryCount { get; private set; }
2020

21-
public IIntegrationEvent Event { get; private set; }
21+
public object Event { get; private set; }
2222

23-
public IntegrationEventLogItem(Guid eventId, string topic, IIntegrationEvent @event)
23+
public IntegrationEventLogItem(Guid eventId, string topic, object @event)
2424
{
2525
EventId = eventId;
2626
Topic = topic;

‎src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents/Processor/RetryByDataProcessor.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,9 @@ await eventLogService.RetrieveEventLogsFailedToPublishAsync(
4949

5050
_logger?.LogDebug("Publishing integration event {Event} to {TopicName}",
5151
eventLog,
52-
eventLog.Event.Topic);
52+
eventLog.Topic);
5353

54-
await publisher.PublishAsync(eventLog.Event.Topic, eventLog.Event, stoppingToken);
54+
await publisher.PublishAsync(eventLog.Topic, eventLog.Event, stoppingToken);
5555

5656
LocalQueueProcessor.Default.RemoveJobs(eventLog.EventId);
5757

‎src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents/Processor/SendByDataProcessor.cs

+3-3
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,9 @@ await eventLogService.RetrieveEventLogsPendingToPublishAsync(
4444

4545
_logger?.LogDebug("Publishing integration event {Event} to {TopicName}",
4646
eventLog,
47-
eventLog.Event.Topic);
47+
eventLog.Topic);
4848

49-
await publisher.PublishAsync(eventLog.Event.Topic, eventLog.Event, stoppingToken);
49+
await publisher.PublishAsync(eventLog.Topic, eventLog.Event, stoppingToken);
5050

5151
await eventLogService.MarkEventAsPublishedAsync(eventLog.EventId, stoppingToken);
5252
}
@@ -61,7 +61,7 @@ await eventLogService.RetrieveEventLogsPendingToPublishAsync(
6161
eventLog.EventId, _masaAppConfigureOptions?.CurrentValue.AppId ?? string.Empty, eventLog);
6262
await eventLogService.MarkEventAsFailedAsync(eventLog.EventId, stoppingToken);
6363

64-
LocalQueueProcessor.Default.AddJobs(new IntegrationEventLogItem(eventLog.EventId, eventLog.Event.Topic, eventLog.Event));
64+
LocalQueueProcessor.Default.AddJobs(new IntegrationEventLogItem(eventLog.EventId, eventLog.Topic, eventLog.Event));
6565
}
6666
}
6767
}

‎src/Contrib/Dispatcher/IntegrationEvents/Tests/Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore.Tests/IntegrationEventLogServiceTest.cs

+15-34
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,7 @@ public void Initialize()
2626
public async Task TestRetrieveEventLogsPendingToPublishAsync(bool isUseLogger)
2727
{
2828
var integrationEventLogService =
29-
await CreateIntegrationEventLogServiceAsync(GetIntegrationEventTypes(typeof(OrderPaymentSucceededIntegrationEvent).Assembly),
30-
async eventLogContext => await InsertDataAsync(eventLogContext),
29+
await CreateIntegrationEventLogServiceAsync(async eventLogContext => await InsertDataAsync(eventLogContext),
3130
isUseLogger
3231
);
3332
var list = await integrationEventLogService.RetrieveEventLogsPendingToPublishAsync(100, CancellationToken.None);
@@ -40,8 +39,7 @@ await CreateIntegrationEventLogServiceAsync(GetIntegrationEventTypes(typeof(Orde
4039
public async Task TestRetrieveEventLogsFailedToPublishAsync(bool isUseLogger)
4140
{
4241
var integrationEventLogService =
43-
await CreateIntegrationEventLogServiceAsync(GetIntegrationEventTypes(typeof(OrderPaymentSucceededIntegrationEvent).Assembly),
44-
async eventLogContext => await InsertDataAsync(eventLogContext),
42+
await CreateIntegrationEventLogServiceAsync(async eventLogContext => await InsertDataAsync(eventLogContext),
4543
isUseLogger
4644
);
4745
var list = await integrationEventLogService.RetrieveEventLogsFailedToPublishAsync(100, 10, 1, CancellationToken.None);
@@ -54,8 +52,7 @@ await CreateIntegrationEventLogServiceAsync(GetIntegrationEventTypes(typeof(Orde
5452
public async Task TestRetrieveEventLogsFailed2ToPublishAsync(bool isUseLogger)
5553
{
5654
var integrationEventLogService =
57-
await CreateIntegrationEventLogServiceAsync(GetIntegrationEventTypes(typeof(OrderPaymentSucceededIntegrationEvent).Assembly),
58-
async eventLogContext =>
55+
await CreateIntegrationEventLogServiceAsync(async eventLogContext =>
5956
{
6057
await InsertDataAsync(eventLogContext);
6158
await InsertDataAsync(eventLogContext, IntegrationEventStates.Published);
@@ -74,8 +71,7 @@ await CreateIntegrationEventLogServiceAsync(GetIntegrationEventTypes(typeof(Orde
7471
public async Task TestSaveEventAsync(bool isUseLogger)
7572
{
7673
var integrationEventLogService =
77-
await CreateIntegrationEventLogServiceAsync(GetIntegrationEventTypes(typeof(OrderPaymentSucceededIntegrationEvent).Assembly),
78-
_ => Task.CompletedTask,
74+
await CreateIntegrationEventLogServiceAsync(_ => Task.CompletedTask,
7975
isUseLogger
8076
);
8177
var serviceProvider = _services.BuildServiceProvider();
@@ -104,8 +100,7 @@ public async Task TestMarkEventAsPublishedAsync(bool isUseLogger)
104100
{
105101
Guid eventId = default!;
106102
var integrationEventLogService =
107-
await CreateIntegrationEventLogServiceAsync(GetIntegrationEventTypes(typeof(OrderPaymentSucceededIntegrationEvent).Assembly),
108-
async eventLogContext => eventId = await InsertDataAsync(eventLogContext, IntegrationEventStates.InProgress),
103+
await CreateIntegrationEventLogServiceAsync(async eventLogContext => eventId = await InsertDataAsync(eventLogContext, IntegrationEventStates.InProgress),
109104
isUseLogger
110105
);
111106
var serviceProvider = _services.BuildServiceProvider();
@@ -123,8 +118,7 @@ public async Task TestMarkEventAsPublished2Async(bool isUseLogger)
123118
{
124119
Guid eventId = default!;
125120
var integrationEventLogService =
126-
await CreateIntegrationEventLogServiceAsync(GetIntegrationEventTypes(typeof(OrderPaymentSucceededIntegrationEvent).Assembly),
127-
async eventLogContext => eventId = await InsertDataAsync(eventLogContext, IntegrationEventStates.NotPublished),
121+
await CreateIntegrationEventLogServiceAsync(async eventLogContext => eventId = await InsertDataAsync(eventLogContext, IntegrationEventStates.NotPublished),
128122
isUseLogger
129123
);
130124
await Assert.ThrowsExceptionAsync<UserFriendlyException>(async ()
@@ -138,8 +132,7 @@ public async Task TestMarkEventAsInProgressAsync(bool isUseLogger)
138132
{
139133
Guid eventId = default!;
140134
var integrationEventLogService =
141-
await CreateIntegrationEventLogServiceAsync(GetIntegrationEventTypes(typeof(OrderPaymentSucceededIntegrationEvent).Assembly),
142-
async eventLogContext => eventId = await InsertDataAsync(eventLogContext),
135+
await CreateIntegrationEventLogServiceAsync(async eventLogContext => eventId = await InsertDataAsync(eventLogContext),
143136
isUseLogger
144137
);
145138
var serviceProvider = _services.BuildServiceProvider();
@@ -157,8 +150,7 @@ public async Task TestMarkEventAsInProgress2Async(bool isUseLogger)
157150
{
158151
Guid eventId = default!;
159152
var integrationEventLogService =
160-
await CreateIntegrationEventLogServiceAsync(GetIntegrationEventTypes(typeof(OrderPaymentSucceededIntegrationEvent).Assembly),
161-
async eventLogContext => eventId = await InsertDataAsync(eventLogContext, IntegrationEventStates.Published),
153+
await CreateIntegrationEventLogServiceAsync(async eventLogContext => eventId = await InsertDataAsync(eventLogContext, IntegrationEventStates.Published),
162154
isUseLogger
163155
);
164156
await Assert.ThrowsExceptionAsync<UserFriendlyException>(async ()
@@ -172,8 +164,7 @@ public async Task TestMarkEventAsInProgress3Async(bool isUseLogger)
172164
{
173165
Guid eventId = default!;
174166
var integrationEventLogService =
175-
await CreateIntegrationEventLogServiceAsync(GetIntegrationEventTypes(typeof(OrderPaymentSucceededIntegrationEvent).Assembly),
176-
async eventLogContext => eventId =
167+
await CreateIntegrationEventLogServiceAsync(async eventLogContext => eventId =
177168
await InsertDataAsync(eventLogContext, IntegrationEventStates.InProgress, DateTime.UtcNow.AddSeconds(1)),
178169
isUseLogger
179170
);
@@ -188,8 +179,7 @@ public async Task TestMarkEventAsFailedAsync(bool isUseLogger)
188179
{
189180
Guid eventId = default!;
190181
var integrationEventLogService =
191-
await CreateIntegrationEventLogServiceAsync(GetIntegrationEventTypes(typeof(OrderPaymentSucceededIntegrationEvent).Assembly),
192-
async eventLogContext => eventId = await InsertDataAsync(eventLogContext, IntegrationEventStates.InProgress),
182+
await CreateIntegrationEventLogServiceAsync(async eventLogContext => eventId = await InsertDataAsync(eventLogContext, IntegrationEventStates.InProgress),
193183
isUseLogger
194184
);
195185
var serviceProvider = _services.BuildServiceProvider();
@@ -207,8 +197,7 @@ public async Task TestMarkEventAsFailed2Async(bool isUseLogger)
207197
{
208198
Guid eventId = default!;
209199
var integrationEventLogService =
210-
await CreateIntegrationEventLogServiceAsync(GetIntegrationEventTypes(typeof(OrderPaymentSucceededIntegrationEvent).Assembly),
211-
async eventLogContext => eventId = await InsertDataAsync(eventLogContext),
200+
await CreateIntegrationEventLogServiceAsync(async eventLogContext => eventId = await InsertDataAsync(eventLogContext),
212201
isUseLogger
213202
);
214203
await Assert.ThrowsExceptionAsync<UserFriendlyException>(async ()
@@ -221,8 +210,7 @@ await Assert.ThrowsExceptionAsync<UserFriendlyException>(async ()
221210
public async Task TestMarkEventAsFailed3Async(bool isUseLogger)
222211
{
223212
var integrationEventLogService =
224-
await CreateIntegrationEventLogServiceAsync(GetIntegrationEventTypes(typeof(OrderPaymentSucceededIntegrationEvent).Assembly),
225-
async eventLogContext => _ = await InsertDataAsync(eventLogContext),
213+
await CreateIntegrationEventLogServiceAsync(async eventLogContext => _ = await InsertDataAsync(eventLogContext),
226214
isUseLogger
227215
);
228216
await Assert.ThrowsExceptionAsync<ArgumentException>(async ()
@@ -235,8 +223,7 @@ await Assert.ThrowsExceptionAsync<ArgumentException>(async ()
235223
public async Task TestDeleteExpiresAsync(bool isUseLogger)
236224
{
237225
var integrationEventLogService =
238-
await CreateIntegrationEventLogServiceAsync(GetIntegrationEventTypes(typeof(OrderPaymentSucceededIntegrationEvent).Assembly),
239-
async eventLogContext => await InsertDataAsync(eventLogContext, IntegrationEventStates.Published),
226+
await CreateIntegrationEventLogServiceAsync(async eventLogContext => await InsertDataAsync(eventLogContext, IntegrationEventStates.Published),
240227
isUseLogger
241228
);
242229
var serviceProvider = _services.BuildServiceProvider();
@@ -253,8 +240,7 @@ await CreateIntegrationEventLogServiceAsync(GetIntegrationEventTypes(typeof(Orde
253240
public async Task TestDeleteExpires2Async(bool isUseLogger)
254241
{
255242
var integrationEventLogService =
256-
await CreateIntegrationEventLogServiceAsync(GetIntegrationEventTypes(typeof(OrderPaymentSucceededIntegrationEvent).Assembly),
257-
async eventLogContext => await InsertDataAsync(eventLogContext, IntegrationEventStates.NotPublished),
243+
await CreateIntegrationEventLogServiceAsync(async eventLogContext => await InsertDataAsync(eventLogContext, IntegrationEventStates.NotPublished),
258244
isUseLogger
259245
);
260246
var serviceProvider = _services.BuildServiceProvider();
@@ -266,7 +252,6 @@ await CreateIntegrationEventLogServiceAsync(GetIntegrationEventTypes(typeof(Orde
266252
}
267253

268254
private async Task<IntegrationEventLogService> CreateIntegrationEventLogServiceAsync(
269-
IEnumerable<Type> integrationEventTypes,
270255
Func<IntegrationEventLogContext, Task> func,
271256
bool isUseLogger)
272257
{
@@ -276,13 +261,9 @@ private async Task<IntegrationEventLogService> CreateIntegrationEventLogServiceA
276261
await integrationEventLogContext.DbContext.Database.EnsureCreatedAsync();
277262
await func.Invoke(integrationEventLogContext);
278263
var logger = isUseLogger ? serviceProvider.GetRequiredService<ILogger<IntegrationEventLogService>>() : null;
279-
return new IntegrationEventLogService(integrationEventTypes, integrationEventLogContext, logger);
264+
return new IntegrationEventLogService(integrationEventLogContext, logger);
280265
}
281266

282-
private static IEnumerable<Type> GetIntegrationEventTypes(params Assembly[] assemblies)
283-
=> assemblies.SelectMany(assembly => assembly.GetTypes())
284-
.Where(type => type.IsClass && typeof(IIntegrationEvent).IsAssignableFrom(type));
285-
286267
private static async Task<Guid> InsertDataAsync(
287268
IntegrationEventLogContext integrationEventLogContext,
288269
IntegrationEventStates? integrationEventStates = null,

‎src/Contrib/Dispatcher/IntegrationEvents/Tests/Masa.Contrib.Dispatcher.IntegrationEvents.Tests/ProcessorTest.cs

+6-9
Original file line numberDiff line numberDiff line change
@@ -67,10 +67,7 @@ public async Task RetryByDataProcessorExecuteTestAsync()
6767
new(@event, Guid.Empty),
6868
new(@event, Guid.Empty)
6969
};
70-
list.ForEach(item =>
71-
{
72-
item.DeserializeJsonContent(typeof(RegisterUserIntegrationEvent));
73-
});
70+
list.ForEach(item => { item.DeserializeJsonContent(); });
7471
integrationEventLogService.Setup(service =>
7572
service.RetrieveEventLogsFailedToPublishAsync(It.IsAny<int>(), It.IsAny<int>(), It.IsAny<int>(),
7673
cancellationTokenSource.Token))
@@ -131,26 +128,26 @@ public async Task RetryByDataProcessorExecute2TestAsync(bool useLogger)
131128
integrationEventLogService.Setup(service => service.MarkEventAsFailedAsync(It.IsAny<Guid>(), cancellationTokenSource.Token))
132129
.Verifiable();
133130

134-
List<IntegrationEventLog> list = new List<IntegrationEventLog>()
131+
var list = new List<IntegrationEventLog>()
135132
{
136133
new(new RegisterUserIntegrationEvent(), Guid.Empty),
137134
new(new PaySuccessedIntegrationEvent(Guid.NewGuid().ToString()), Guid.Empty)
138135
};
139136
for (int index = 0; index < list.Count; index++)
140137
{
141138
if (index == 0)
142-
list[index].DeserializeJsonContent(typeof(RegisterUserIntegrationEvent));
139+
list[index].DeserializeJsonContent();
143140
else
144-
list[index].DeserializeJsonContent(typeof(PaySuccessedIntegrationEvent));
141+
list[index].DeserializeJsonContent();
145142
}
146143

147144
Mock<IPublisher> publisher = new();
148145
publisher.Setup(client
149-
=> client.PublishAsync(nameof(RegisterUserIntegrationEvent), It.IsAny<IIntegrationEvent>(),
146+
=> client.PublishAsync(nameof(RegisterUserIntegrationEvent), It.IsAny<object>(),
150147
cancellationTokenSource.Token))
151148
.Throws(new Exception("custom exception"));
152149
publisher.Setup(client
153-
=> client.PublishAsync(nameof(PaySuccessedIntegrationEvent), It.IsAny<IIntegrationEvent>(),
150+
=> client.PublishAsync(nameof(PaySuccessedIntegrationEvent), It.IsAny<object>(),
154151
cancellationTokenSource.Token))
155152
.Throws(new UserFriendlyException("custom exception"));
156153
services.AddScoped(_ => publisher.Object);

‎src/Contrib/Dispatcher/IntegrationEvents/Tests/Masa.Contrib.Dispatcher.IntegrationEvents.Tests/SendByDataProcessorTest.cs

+4-4
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public async Task TestPublishEventByPublishIsFailedAsync()
2727
};
2828
foreach (var log in eventLogs)
2929
{
30-
log.DeserializeJsonContent(typeof(RegisterUserEvent));
30+
log.DeserializeJsonContent();
3131
}
3232
logService
3333
.Setup(l => l.RetrieveEventLogsPendingToPublishAsync(20, default))
@@ -52,7 +52,7 @@ public async Task TestPublishEventByPublishIsFailedAsync()
5252
logService.Verify(l => l.MarkEventAsInProgressAsync(It.IsAny<Guid>(), It.IsAny<int>(), default), Times.Once);
5353
logService.Verify(l => l.MarkEventAsPublishedAsync(It.IsAny<Guid>(), default), Times.Never);
5454
logService.Verify(l => l.MarkEventAsFailedAsync(It.IsAny<Guid>(), default), Times.Once);
55-
publisher.Verify(p => p.PublishAsync(It.IsAny<string>(), It.IsAny<IIntegrationEvent>(), default), Times.Once);
55+
publisher.Verify(p => p.PublishAsync(It.IsAny<string>(), It.IsAny<object>(), default), Times.Once);
5656
}
5757

5858
[TestMethod]
@@ -65,7 +65,7 @@ public async Task TestPublishEventByPublishIsSuccessedAsync()
6565
};
6666
foreach (var log in eventLogs)
6767
{
68-
log.DeserializeJsonContent(typeof(RegisterUserEvent));
68+
log.DeserializeJsonContent();
6969
}
7070
logService
7171
.Setup(l => l.RetrieveEventLogsPendingToPublishAsync(20, default))
@@ -90,6 +90,6 @@ public async Task TestPublishEventByPublishIsSuccessedAsync()
9090
logService.Verify(l => l.MarkEventAsInProgressAsync(It.IsAny<Guid>(), It.IsAny<int>(), default), Times.Once);
9191
logService.Verify(l => l.MarkEventAsPublishedAsync(It.IsAny<Guid>(), default), Times.Once);
9292
logService.Verify(l => l.MarkEventAsFailedAsync(It.IsAny<Guid>(), default), Times.Never);
93-
publisher.Verify(p => p.PublishAsync(It.IsAny<string>(), It.IsAny<IIntegrationEvent>(), default), Times.Once);
93+
publisher.Verify(p => p.PublishAsync(It.IsAny<string>(), It.IsAny<object>(), default), Times.Once);
9494
}
9595
}

‎test/Masa.Framework.IntegrationTests.EventBus/Infrastructure/Extensions/DefaultPublisher.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ namespace Masa.Framework.IntegrationTests.EventBus.Infrastructure.Extensions;
55

66
public class DefaultPublisher : IPublisher
77
{
8-
public Task PublishAsync<T>(string topicName, T @event, CancellationToken stoppingToken = default) where T : IIntegrationEvent
8+
public Task PublishAsync<T>(string topicName, T @event, CancellationToken stoppingToken = default)
99
{
1010
return Task.CompletedTask;
1111
}

0 commit comments

Comments
 (0)
Please sign in to comment.