Skip to content

Commit d3d2ca4

Browse files
authored
feature(dispatcher): add IDistributedDispatcherOptions and add IEventBusBuild (#22)
* refactor(Dispatcher): Adjust the usage of EventBus and IntegrationEventBus, and change the state of the local message table to be processed by sql statement * test(dispatcher): Adjust the dispatcher unit test to adapt to the new writing method * chore(Repository.EF): change _context To Context * fix(UoW): Fix UnitOfWork release DbContext problem * refactor(IntegrationEvents.Dapr): adjust AddDaprEventBus methods parameter * chore(IntegrationEvents.Dapr): adjust UseDaprEventBus methods parameter * chore: Remove dead code * chore: Modify the EfCommon class name to DbContextExtensions
1 parent 0170f57 commit d3d2ca4

File tree

33 files changed

+495
-266
lines changed

33 files changed

+495
-266
lines changed

src/Data/Masa.Contrib.Data.UoW.EF/UnitOfWork.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public async Task RollbackAsync(CancellationToken cancellationToken = default)
6161
await Context.Database.RollbackTransactionAsync(cancellationToken);
6262
}
6363

64-
public ValueTask DisposeAsync() => Context.DisposeAsync();
64+
public async ValueTask DisposeAsync() => await (_context?.DisposeAsync() ?? ValueTask.CompletedTask);
6565

66-
public void Dispose() => Context.Dispose();
66+
public void Dispose() => _context?.Dispose();
6767
}

src/Ddd/Masa.Contrib.Ddd.Domain.Repository.EF/Repository.cs

+23-23
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,16 @@ public class Repository<TDbContext, TEntity> :
55
where TEntity : class, IEntity
66
where TDbContext : DbContext
77
{
8-
protected readonly TDbContext _context;
8+
protected readonly TDbContext Context;
99

1010
public Repository(TDbContext context, IUnitOfWork unitOfWork)
1111
{
12-
_context = context;
12+
Context = context;
1313
UnitOfWork = unitOfWork;
1414
}
1515

1616
public override bool TransactionHasBegun
17-
=> _context.Database.CurrentTransaction != null;
17+
=> Context.Database.CurrentTransaction != null;
1818

1919
public override DbTransaction Transaction
2020
{
@@ -24,9 +24,9 @@ public override DbTransaction Transaction
2424
throw new NotSupportedException(nameof(Transaction));
2525

2626
if (TransactionHasBegun)
27-
return _context.Database.CurrentTransaction!.GetDbTransaction();
27+
return Context.Database.CurrentTransaction!.GetDbTransaction();
2828

29-
return _context.Database.BeginTransaction().GetDbTransaction();
29+
return Context.Database.BeginTransaction().GetDbTransaction();
3030
}
3131
}
3232

@@ -53,7 +53,7 @@ public override async ValueTask<TEntity> AddAsync(
5353
TEntity entity,
5454
CancellationToken cancellationToken = default)
5555
{
56-
var response = (await _context.AddAsync(entity, cancellationToken).AsTask()).Entity;
56+
var response = (await Context.AddAsync(entity, cancellationToken).AsTask()).Entity;
5757
EntityState = EntityState.Changed;
5858
return response;
5959
}
@@ -62,45 +62,45 @@ public override async Task AddRangeAsync(
6262
IEnumerable<TEntity> entities,
6363
CancellationToken cancellationToken = default)
6464
{
65-
await _context.AddRangeAsync(entities, cancellationToken);
65+
await Context.AddRangeAsync(entities, cancellationToken);
6666
EntityState = EntityState.Changed;
6767
}
6868

6969
public override Task CommitAsync(CancellationToken cancellationToken = default)
7070
=> UnitOfWork.CommitAsync(cancellationToken);
7171

72-
public override async ValueTask DisposeAsync() => await _context.DisposeAsync();
72+
public override async ValueTask DisposeAsync() => await Context.DisposeAsync();
7373

74-
public override void Dispose() => _context.Dispose();
74+
public override void Dispose() => Context.Dispose();
7575

7676
public override Task<TEntity?> FindAsync(
7777
IEnumerable<KeyValuePair<string, object>> keyValues,
7878
CancellationToken cancellationToken = default)
7979
{
8080
Dictionary<string, object> fields = new(keyValues);
81-
return _context.Set<TEntity>().IgnoreQueryFilters().GetQueryable(fields).FirstOrDefaultAsync(cancellationToken);
81+
return Context.Set<TEntity>().IgnoreQueryFilters().GetQueryable(fields).FirstOrDefaultAsync(cancellationToken);
8282
}
8383

8484
public override Task<TEntity?> FindAsync(
8585
Expression<Func<TEntity, bool>> predicate,
8686
CancellationToken cancellationToken = default)
87-
=> _context.Set<TEntity>().Where(predicate).FirstOrDefaultAsync(cancellationToken);
87+
=> Context.Set<TEntity>().Where(predicate).FirstOrDefaultAsync(cancellationToken);
8888

8989
public override async Task<long> GetCountAsync(CancellationToken cancellationToken = default)
90-
=> await _context.Set<TEntity>().LongCountAsync(cancellationToken);
90+
=> await Context.Set<TEntity>().LongCountAsync(cancellationToken);
9191

9292
public override Task<long> GetCountAsync(
9393
Expression<Func<TEntity, bool>> predicate,
9494
CancellationToken cancellationToken = default)
95-
=> _context.Set<TEntity>().LongCountAsync(predicate, cancellationToken);
95+
=> Context.Set<TEntity>().LongCountAsync(predicate, cancellationToken);
9696

9797
public override async Task<IEnumerable<TEntity>> GetListAsync(CancellationToken cancellationToken = default)
98-
=> await _context.Set<TEntity>().ToListAsync(cancellationToken);
98+
=> await Context.Set<TEntity>().ToListAsync(cancellationToken);
9999

100100
public override async Task<IEnumerable<TEntity>> GetListAsync(
101101
Expression<Func<TEntity, bool>> predicate,
102102
CancellationToken cancellationToken = default)
103-
=> await _context.Set<TEntity>().Where(predicate).ToListAsync(cancellationToken);
103+
=> await Context.Set<TEntity>().Where(predicate).ToListAsync(cancellationToken);
104104

105105
/// <summary>
106106
///
@@ -118,7 +118,7 @@ public override Task<List<TEntity>> GetPaginatedListAsync(
118118
{
119119
sorting ??= new Dictionary<string, bool>();
120120

121-
return _context.Set<TEntity>().OrderBy(sorting).Skip(skip).Take(take).ToListAsync(cancellationToken);
121+
return Context.Set<TEntity>().OrderBy(sorting).Skip(skip).Take(take).ToListAsync(cancellationToken);
122122
}
123123

124124
/// <summary>
@@ -139,12 +139,12 @@ public override Task<List<TEntity>> GetPaginatedListAsync(
139139
{
140140
sorting ??= new Dictionary<string, bool>();
141141

142-
return _context.Set<TEntity>().Where(predicate).OrderBy(sorting).Skip(skip).Take(take).ToListAsync(cancellationToken);
142+
return Context.Set<TEntity>().Where(predicate).OrderBy(sorting).Skip(skip).Take(take).ToListAsync(cancellationToken);
143143
}
144144

145145
public override Task<TEntity> RemoveAsync(TEntity entity, CancellationToken cancellationToken = default)
146146
{
147-
_context.Set<TEntity>().Remove(entity);
147+
Context.Set<TEntity>().Remove(entity);
148148
EntityState = EntityState.Changed;
149149
return Task.FromResult(entity);
150150
}
@@ -153,12 +153,12 @@ public override async Task RemoveAsync(Expression<Func<TEntity, bool>> predicate
153153
{
154154
var entities = await GetListAsync(predicate, cancellationToken);
155155
EntityState = EntityState.Changed;
156-
_context.Set<TEntity>().RemoveRange(entities);
156+
Context.Set<TEntity>().RemoveRange(entities);
157157
}
158158

159159
public override Task RemoveRangeAsync(IEnumerable<TEntity> entities, CancellationToken cancellationToken = default)
160160
{
161-
_context.Set<TEntity>().RemoveRange(entities);
161+
Context.Set<TEntity>().RemoveRange(entities);
162162
EntityState = EntityState.Changed;
163163
return Task.CompletedTask;
164164
}
@@ -173,14 +173,14 @@ public override async Task SaveChangesAsync(CancellationToken cancellationToken
173173

174174
public override Task<TEntity> UpdateAsync(TEntity entity, CancellationToken cancellationToken = default)
175175
{
176-
_context.Set<TEntity>().Update(entity);
176+
Context.Set<TEntity>().Update(entity);
177177
EntityState = EntityState.Changed;
178178
return Task.FromResult(entity);
179179
}
180180

181181
public override Task UpdateRangeAsync(IEnumerable<TEntity> entities, CancellationToken cancellationToken = default)
182182
{
183-
_context.Set<TEntity>().UpdateRange(entities);
183+
Context.Set<TEntity>().UpdateRange(entities);
184184
EntityState = EntityState.Changed;
185185
return Task.CompletedTask;
186186
}
@@ -213,5 +213,5 @@ public Repository(TDbContext context, IUnitOfWork unitOfWork) : base(context, un
213213
}
214214

215215
public Task<TEntity?> FindAsync(TKey id)
216-
=> _context.Set<TEntity>().FirstOrDefaultAsync(entity => entity.Id.Equals(id));
216+
=> Context.Set<TEntity>().FirstOrDefaultAsync(entity => entity.Id.Equals(id));
217217
}

src/Ddd/Masa.Contrib.Ddd.Domain/DomainEventBus.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ namespace Masa.Contrib.Ddd.Domain;
22

33
public class DomainEventBus : IDomainEventBus
44
{
5-
protected readonly IEventBus _eventBus;
6-
protected readonly IIntegrationEventBus _integrationEventBus;
5+
private readonly IEventBus _eventBus;
6+
private readonly IIntegrationEventBus _integrationEventBus;
77
private readonly IUnitOfWork _unitOfWork;
88
private readonly DispatcherOptions _options;
99

src/Ddd/Masa.Contrib.Ddd.Domain/Options/DispatcherOptions.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
namespace Masa.Contrib.Ddd.Domain;
1+
namespace Masa.Contrib.Ddd.Domain.Options;
22

3-
public class DispatcherOptions : IDispatcherOptions
3+
public class DispatcherOptions : IDistributedDispatcherOptions
44
{
55
public IServiceCollection Services { get; }
66

src/Ddd/Masa.Contrib.Ddd.Domain/ServiceCollectionExtensions.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public static IServiceCollection AddDomainEventBus(
1919

2020
var dispatcherOptions = new DispatcherOptions(services, assemblies);
2121
options?.Invoke(dispatcherOptions);
22-
services.AddSingleton(typeof(IOptions<DispatcherOptions>), _ => Options.Create(dispatcherOptions));
22+
services.AddSingleton(typeof(IOptions<DispatcherOptions>), _ => Microsoft.Extensions.Options.Options.Create(dispatcherOptions));
2323

2424
if (services.All(service => service.ServiceType != typeof(IEventBus)))
2525
throw new Exception("Please add EventBus first.");

src/Ddd/Masa.Contrib.Ddd.Domain/_Imports.cs

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
global using Masa.BuildingBlocks.Dispatcher.Events;
77
global using Masa.BuildingBlocks.Dispatcher.IntegrationEvents;
88
global using Masa.Contrib.Ddd.Domain.Internal;
9+
global using Masa.Contrib.Ddd.Domain.Options;
910
global using Microsoft.Extensions.DependencyInjection;
1011
global using Microsoft.Extensions.DependencyInjection.Extensions;
1112
global using Microsoft.Extensions.Options;
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,18 @@
11
namespace Masa.Contrib.Dispatcher.Events;
22

3-
public class EventBusBuilder
3+
public class EventBusBuilder: IEventBusBuilder
44
{
55
public IServiceCollection Services { get; }
66

77
public EventBusBuilder(IServiceCollection services) => Services = services;
8+
9+
public IEventBusBuilder UseMiddleware(Type middleware, ServiceLifetime middlewareLifetime = ServiceLifetime.Transient)
10+
{
11+
if (!typeof(IMiddleware<>).IsGenericInterfaceAssignableFrom(middleware))
12+
throw new ArgumentException($"{middleware.Name} doesn't implement IMiddleware<>");
13+
14+
var descriptor = new ServiceDescriptor(typeof(IMiddleware<>), middleware, middlewareLifetime);
15+
Services.TryAddEnumerable(descriptor);
16+
return this;
17+
}
818
}

src/Dispatcher/Masa.Contrib.Dispatcher.Events/Options/DispatcherOptions.cs

+3-10
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
namespace Masa.Contrib.Dispatcher.Events.Options;
22

3-
public class DispatcherOptions : IDispatcherOptions
3+
public class DispatcherOptions
44
{
5-
public IServiceCollection Services { get; }
5+
private IServiceCollection Services { get; }
66

7-
public Assembly[] Assemblies { get; }
7+
private Assembly[] Assemblies { get; }
88

99
private bool IsSupportUnitOfWork(Type eventType)
1010
=> typeof(ITransaction).IsAssignableFrom(eventType) && !typeof(IDomainQuery<>).IsGenericInterfaceAssignableFrom(eventType);
@@ -28,11 +28,4 @@ public DispatcherOptions(IServiceCollection services, Assembly[] assemblies)
2828
.ToList();
2929
UnitOfWorkRelation = AllEventTypes.ToDictionary(type => type, IsSupportUnitOfWork);
3030
}
31-
32-
public DispatcherOptions UseMiddleware(Type middleware, ServiceLifetime middlewareLifetime = ServiceLifetime.Scoped)
33-
{
34-
var descriptor = new ServiceDescriptor(typeof(IMiddleware<>), middleware, middlewareLifetime);
35-
Services.TryAddEnumerable(descriptor);
36-
return this;
37-
}
3831
}

src/Dispatcher/Masa.Contrib.Dispatcher.Events/README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ public class LoggingMiddleware<TEvent>
166166

167167

168168
```C#
169-
builder.Services.AddEventBus(options => options.UseMiddleware(typeof(ValidatorMiddleware<>)));
169+
builder.Services.AddEventBus(eventBusBuilder => eventBusBuilder.UseMiddleware(typeof(ValidatorMiddleware<>)));
170170
```
171171

172172
4. Support Transaction

src/Dispatcher/Masa.Contrib.Dispatcher.Events/README.zh-CN.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ public class LoggingMiddleware<TEvent>
165165
2. 启用自定义Middleware
166166

167167
```C#
168-
builder.Services.AddEventBus(options => options.UseMiddleware(typeof(ValidatorMiddleware<>)));
168+
builder.Services.AddEventBus(eventBusBuilder => eventBusBuilder.UseMiddleware(typeof(ValidatorMiddleware<>)));
169169
```
170170

171171
4. 支持Transaction

src/Dispatcher/Masa.Contrib.Dispatcher.Events/ServiceCollectionExtensions.cs

+14-15
Original file line numberDiff line numberDiff line change
@@ -4,33 +4,33 @@ public static class ServiceCollectionExtensions
44
{
55
public static IServiceCollection AddEventBus(
66
this IServiceCollection services,
7-
Action<DispatcherOptions>? options = null)
8-
=> services.AddEventBus(AppDomain.CurrentDomain.GetAssemblies(), options);
7+
Action<EventBusBuilder>? eventBusBuilder = null)
8+
=> services.AddEventBus(AppDomain.CurrentDomain.GetAssemblies(), eventBusBuilder);
99

1010
public static IServiceCollection AddEventBus(
1111
this IServiceCollection services,
1212
Assembly[] assemblies,
13-
Action<DispatcherOptions>? options = null)
14-
=> services.AddEventBus(assemblies, ServiceLifetime.Scoped, options);
13+
Action<EventBusBuilder>? eventBusBuilder = null)
14+
=> services.AddEventBus(assemblies, ServiceLifetime.Scoped, eventBusBuilder);
1515

1616
public static IServiceCollection AddEventBus(
1717
this IServiceCollection services,
1818
Assembly[] assemblies,
1919
ServiceLifetime lifetime,
20-
Action<DispatcherOptions>? options = null)
20+
Action<EventBusBuilder>? eventBusBuilder = null)
2121
{
2222
if (services.Any(service => service.ImplementationType == typeof(EventBusProvider)))
2323
return services;
2424

2525
services.AddSingleton<EventBusProvider>();
2626

27+
eventBusBuilder?.Invoke(new EventBusBuilder(services));
28+
2729
DispatcherOptions dispatcherOptions = new DispatcherOptions(services, assemblies);
28-
options?.Invoke(dispatcherOptions);
2930
services.AddSingleton(typeof(IOptions<DispatcherOptions>),
3031
_ => Microsoft.Extensions.Options.Options.Create(dispatcherOptions));
31-
32-
services.AddSingleton(new SagaDispatcher(services, dispatcherOptions.Assemblies).Build(lifetime));
33-
services.AddSingleton(new Internal.Dispatch.Dispatcher(services, dispatcherOptions.Assemblies).Build(lifetime));
32+
services.AddSingleton(new SagaDispatcher(services, assemblies).Build(lifetime));
33+
services.AddSingleton(new Internal.Dispatch.Dispatcher(services, assemblies).Build(lifetime));
3434
services.TryAdd(typeof(IExecutionStrategy), typeof(ExecutionStrategy), ServiceLifetime.Singleton);
3535
services.AddTransient(typeof(IMiddleware<>), typeof(TransactionMiddleware<>));
3636
services.AddScoped(typeof(IEventBus), typeof(EventBus));
@@ -41,20 +41,20 @@ public static IServiceCollection AddTestEventBus(
4141
this IServiceCollection services,
4242
Assembly[] assemblies,
4343
ServiceLifetime lifetime,
44-
Action<DispatcherOptions>? options = null)
44+
Action<EventBusBuilder>? eventBusBuilder = null)
4545
{
4646
if (services.Any(service => service.ImplementationType == typeof(EventBusProvider)))
4747
return services;
4848

4949
services.AddSingleton<EventBusProvider>();
5050

51-
DispatcherOptions dispatcherOptions = new DispatcherOptions(services, assemblies);
52-
options?.Invoke(dispatcherOptions);
51+
eventBusBuilder?.Invoke(new EventBusBuilder(services));
5352

53+
DispatcherOptions dispatcherOptions = new DispatcherOptions(services, assemblies);
5454
services.AddSingleton(typeof(IOptions<DispatcherOptions>),
5555
serviceProvider => Microsoft.Extensions.Options.Options.Create(dispatcherOptions));
56-
services.AddSingleton(new SagaDispatcher(services, dispatcherOptions.Assemblies, true).Build(lifetime));
57-
services.AddSingleton(new Internal.Dispatch.Dispatcher(services, dispatcherOptions.Assemblies).Build(lifetime));
56+
services.AddSingleton(new SagaDispatcher(services, assemblies, true).Build(lifetime));
57+
services.AddSingleton(new Internal.Dispatch.Dispatcher(services, assemblies).Build(lifetime));
5858
services.TryAdd(typeof(IExecutionStrategy), typeof(ExecutionStrategy), ServiceLifetime.Singleton);
5959
services.AddTransient(typeof(IMiddleware<>), typeof(TransactionMiddleware<>));
6060
services.AddScoped(typeof(IEventBus), typeof(EventBus));
@@ -64,6 +64,5 @@ public static IServiceCollection AddTestEventBus(
6464

6565
private class EventBusProvider
6666
{
67-
6867
}
6968
}

src/Dispatcher/Masa.Contrib.Dispatcher.Events/UseMiddlewareExtensions.cs

-17
This file was deleted.

0 commit comments

Comments
 (0)