Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature(dispatcher): add IDistributedDispatcherOptions and add IEventBusBuild #22

Merged
merged 8 commits into from
Mar 28, 2022
4 changes: 2 additions & 2 deletions src/Data/Masa.Contrib.Data.UoW.EF/UnitOfWork.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public async Task RollbackAsync(CancellationToken cancellationToken = default)
await Context.Database.RollbackTransactionAsync(cancellationToken);
}

public ValueTask DisposeAsync() => Context.DisposeAsync();
public async ValueTask DisposeAsync() => await (_context?.DisposeAsync() ?? ValueTask.CompletedTask);

public void Dispose() => Context.Dispose();
public void Dispose() => _context?.Dispose();
}
46 changes: 23 additions & 23 deletions src/Ddd/Masa.Contrib.Ddd.Domain.Repository.EF/Repository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@ public class Repository<TDbContext, TEntity> :
where TEntity : class, IEntity
where TDbContext : DbContext
{
protected readonly TDbContext _context;
protected readonly TDbContext Context;

public Repository(TDbContext context, IUnitOfWork unitOfWork)
{
_context = context;
Context = context;
UnitOfWork = unitOfWork;
}

public override bool TransactionHasBegun
=> _context.Database.CurrentTransaction != null;
=> Context.Database.CurrentTransaction != null;

public override DbTransaction Transaction
{
Expand All @@ -24,9 +24,9 @@ public override DbTransaction Transaction
throw new NotSupportedException(nameof(Transaction));

if (TransactionHasBegun)
return _context.Database.CurrentTransaction!.GetDbTransaction();
return Context.Database.CurrentTransaction!.GetDbTransaction();

return _context.Database.BeginTransaction().GetDbTransaction();
return Context.Database.BeginTransaction().GetDbTransaction();
}
}

Expand All @@ -53,7 +53,7 @@ public override async ValueTask<TEntity> AddAsync(
TEntity entity,
CancellationToken cancellationToken = default)
{
var response = (await _context.AddAsync(entity, cancellationToken).AsTask()).Entity;
var response = (await Context.AddAsync(entity, cancellationToken).AsTask()).Entity;
EntityState = EntityState.Changed;
return response;
}
Expand All @@ -62,45 +62,45 @@ public override async Task AddRangeAsync(
IEnumerable<TEntity> entities,
CancellationToken cancellationToken = default)
{
await _context.AddRangeAsync(entities, cancellationToken);
await Context.AddRangeAsync(entities, cancellationToken);
EntityState = EntityState.Changed;
}

public override Task CommitAsync(CancellationToken cancellationToken = default)
=> UnitOfWork.CommitAsync(cancellationToken);

public override async ValueTask DisposeAsync() => await _context.DisposeAsync();
public override async ValueTask DisposeAsync() => await Context.DisposeAsync();

public override void Dispose() => _context.Dispose();
public override void Dispose() => Context.Dispose();

public override Task<TEntity?> FindAsync(
IEnumerable<KeyValuePair<string, object>> keyValues,
CancellationToken cancellationToken = default)
{
Dictionary<string, object> fields = new(keyValues);
return _context.Set<TEntity>().IgnoreQueryFilters().GetQueryable(fields).FirstOrDefaultAsync(cancellationToken);
return Context.Set<TEntity>().IgnoreQueryFilters().GetQueryable(fields).FirstOrDefaultAsync(cancellationToken);
}

public override Task<TEntity?> FindAsync(
Expression<Func<TEntity, bool>> predicate,
CancellationToken cancellationToken = default)
=> _context.Set<TEntity>().Where(predicate).FirstOrDefaultAsync(cancellationToken);
=> Context.Set<TEntity>().Where(predicate).FirstOrDefaultAsync(cancellationToken);

public override async Task<long> GetCountAsync(CancellationToken cancellationToken = default)
=> await _context.Set<TEntity>().LongCountAsync(cancellationToken);
=> await Context.Set<TEntity>().LongCountAsync(cancellationToken);

public override Task<long> GetCountAsync(
Expression<Func<TEntity, bool>> predicate,
CancellationToken cancellationToken = default)
=> _context.Set<TEntity>().LongCountAsync(predicate, cancellationToken);
=> Context.Set<TEntity>().LongCountAsync(predicate, cancellationToken);

public override async Task<IEnumerable<TEntity>> GetListAsync(CancellationToken cancellationToken = default)
=> await _context.Set<TEntity>().ToListAsync(cancellationToken);
=> await Context.Set<TEntity>().ToListAsync(cancellationToken);

public override async Task<IEnumerable<TEntity>> GetListAsync(
Expression<Func<TEntity, bool>> predicate,
CancellationToken cancellationToken = default)
=> await _context.Set<TEntity>().Where(predicate).ToListAsync(cancellationToken);
=> await Context.Set<TEntity>().Where(predicate).ToListAsync(cancellationToken);

/// <summary>
///
Expand All @@ -118,7 +118,7 @@ public override Task<List<TEntity>> GetPaginatedListAsync(
{
sorting ??= new Dictionary<string, bool>();

return _context.Set<TEntity>().OrderBy(sorting).Skip(skip).Take(take).ToListAsync(cancellationToken);
return Context.Set<TEntity>().OrderBy(sorting).Skip(skip).Take(take).ToListAsync(cancellationToken);
}

/// <summary>
Expand All @@ -139,12 +139,12 @@ public override Task<List<TEntity>> GetPaginatedListAsync(
{
sorting ??= new Dictionary<string, bool>();

return _context.Set<TEntity>().Where(predicate).OrderBy(sorting).Skip(skip).Take(take).ToListAsync(cancellationToken);
return Context.Set<TEntity>().Where(predicate).OrderBy(sorting).Skip(skip).Take(take).ToListAsync(cancellationToken);
}

public override Task<TEntity> RemoveAsync(TEntity entity, CancellationToken cancellationToken = default)
{
_context.Set<TEntity>().Remove(entity);
Context.Set<TEntity>().Remove(entity);
EntityState = EntityState.Changed;
return Task.FromResult(entity);
}
Expand All @@ -153,12 +153,12 @@ public override async Task RemoveAsync(Expression<Func<TEntity, bool>> predicate
{
var entities = await GetListAsync(predicate, cancellationToken);
EntityState = EntityState.Changed;
_context.Set<TEntity>().RemoveRange(entities);
Context.Set<TEntity>().RemoveRange(entities);
}

public override Task RemoveRangeAsync(IEnumerable<TEntity> entities, CancellationToken cancellationToken = default)
{
_context.Set<TEntity>().RemoveRange(entities);
Context.Set<TEntity>().RemoveRange(entities);
EntityState = EntityState.Changed;
return Task.CompletedTask;
}
Expand All @@ -173,14 +173,14 @@ public override async Task SaveChangesAsync(CancellationToken cancellationToken

public override Task<TEntity> UpdateAsync(TEntity entity, CancellationToken cancellationToken = default)
{
_context.Set<TEntity>().Update(entity);
Context.Set<TEntity>().Update(entity);
EntityState = EntityState.Changed;
return Task.FromResult(entity);
}

public override Task UpdateRangeAsync(IEnumerable<TEntity> entities, CancellationToken cancellationToken = default)
{
_context.Set<TEntity>().UpdateRange(entities);
Context.Set<TEntity>().UpdateRange(entities);
EntityState = EntityState.Changed;
return Task.CompletedTask;
}
Expand Down Expand Up @@ -213,5 +213,5 @@ public Repository(TDbContext context, IUnitOfWork unitOfWork) : base(context, un
}

public Task<TEntity?> FindAsync(TKey id)
=> _context.Set<TEntity>().FirstOrDefaultAsync(entity => entity.Id.Equals(id));
=> Context.Set<TEntity>().FirstOrDefaultAsync(entity => entity.Id.Equals(id));
}
4 changes: 2 additions & 2 deletions src/Ddd/Masa.Contrib.Ddd.Domain/DomainEventBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ namespace Masa.Contrib.Ddd.Domain;

public class DomainEventBus : IDomainEventBus
{
protected readonly IEventBus _eventBus;
protected readonly IIntegrationEventBus _integrationEventBus;
private readonly IEventBus _eventBus;
private readonly IIntegrationEventBus _integrationEventBus;
private readonly IUnitOfWork _unitOfWork;
private readonly DispatcherOptions _options;

Expand Down
4 changes: 2 additions & 2 deletions src/Ddd/Masa.Contrib.Ddd.Domain/Options/DispatcherOptions.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
namespace Masa.Contrib.Ddd.Domain;
namespace Masa.Contrib.Ddd.Domain.Options;

public class DispatcherOptions : IDispatcherOptions
public class DispatcherOptions : IDistributedDispatcherOptions
{
public IServiceCollection Services { get; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public static IServiceCollection AddDomainEventBus(

var dispatcherOptions = new DispatcherOptions(services, assemblies);
options?.Invoke(dispatcherOptions);
services.AddSingleton(typeof(IOptions<DispatcherOptions>), _ => Options.Create(dispatcherOptions));
services.AddSingleton(typeof(IOptions<DispatcherOptions>), _ => Microsoft.Extensions.Options.Options.Create(dispatcherOptions));

if (services.All(service => service.ServiceType != typeof(IEventBus)))
throw new Exception("Please add EventBus first.");
Expand Down
1 change: 1 addition & 0 deletions src/Ddd/Masa.Contrib.Ddd.Domain/_Imports.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
global using Masa.BuildingBlocks.Dispatcher.Events;
global using Masa.BuildingBlocks.Dispatcher.IntegrationEvents;
global using Masa.Contrib.Ddd.Domain.Internal;
global using Masa.Contrib.Ddd.Domain.Options;
global using Microsoft.Extensions.DependencyInjection;
global using Microsoft.Extensions.DependencyInjection.Extensions;
global using Microsoft.Extensions.Options;
Expand Down
12 changes: 11 additions & 1 deletion src/Dispatcher/Masa.Contrib.Dispatcher.Events/EventBusBuilder.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
namespace Masa.Contrib.Dispatcher.Events;

public class EventBusBuilder
public class EventBusBuilder: IEventBusBuilder
{
public IServiceCollection Services { get; }

public EventBusBuilder(IServiceCollection services) => Services = services;

public IEventBusBuilder UseMiddleware(Type middleware, ServiceLifetime middlewareLifetime = ServiceLifetime.Transient)
{
if (!typeof(IMiddleware<>).IsGenericInterfaceAssignableFrom(middleware))
throw new ArgumentException($"{middleware.Name} doesn't implement IMiddleware<>");

var descriptor = new ServiceDescriptor(typeof(IMiddleware<>), middleware, middlewareLifetime);
Services.TryAddEnumerable(descriptor);
return this;
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
namespace Masa.Contrib.Dispatcher.Events.Options;

public class DispatcherOptions : IDispatcherOptions
public class DispatcherOptions
{
public IServiceCollection Services { get; }
private IServiceCollection Services { get; }

public Assembly[] Assemblies { get; }
private Assembly[] Assemblies { get; }

private bool IsSupportUnitOfWork(Type eventType)
=> typeof(ITransaction).IsAssignableFrom(eventType) && !typeof(IDomainQuery<>).IsGenericInterfaceAssignableFrom(eventType);
Expand All @@ -28,11 +28,4 @@ public DispatcherOptions(IServiceCollection services, Assembly[] assemblies)
.ToList();
UnitOfWorkRelation = AllEventTypes.ToDictionary(type => type, IsSupportUnitOfWork);
}

public DispatcherOptions UseMiddleware(Type middleware, ServiceLifetime middlewareLifetime = ServiceLifetime.Scoped)
{
var descriptor = new ServiceDescriptor(typeof(IMiddleware<>), middleware, middlewareLifetime);
Services.TryAddEnumerable(descriptor);
return this;
}
}
2 changes: 1 addition & 1 deletion src/Dispatcher/Masa.Contrib.Dispatcher.Events/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public class LoggingMiddleware<TEvent>


```C#
builder.Services.AddEventBus(options => options.UseMiddleware(typeof(ValidatorMiddleware<>)));
builder.Services.AddEventBus(eventBusBuilder => eventBusBuilder.UseMiddleware(typeof(ValidatorMiddleware<>)));
```

4. Support Transaction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public class LoggingMiddleware<TEvent>
2. 启用自定义Middleware

```C#
builder.Services.AddEventBus(options => options.UseMiddleware(typeof(ValidatorMiddleware<>)));
builder.Services.AddEventBus(eventBusBuilder => eventBusBuilder.UseMiddleware(typeof(ValidatorMiddleware<>)));
```

4. 支持Transaction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,33 @@ public static class ServiceCollectionExtensions
{
public static IServiceCollection AddEventBus(
this IServiceCollection services,
Action<DispatcherOptions>? options = null)
=> services.AddEventBus(AppDomain.CurrentDomain.GetAssemblies(), options);
Action<EventBusBuilder>? eventBusBuilder = null)
=> services.AddEventBus(AppDomain.CurrentDomain.GetAssemblies(), eventBusBuilder);

public static IServiceCollection AddEventBus(
this IServiceCollection services,
Assembly[] assemblies,
Action<DispatcherOptions>? options = null)
=> services.AddEventBus(assemblies, ServiceLifetime.Scoped, options);
Action<EventBusBuilder>? eventBusBuilder = null)
=> services.AddEventBus(assemblies, ServiceLifetime.Scoped, eventBusBuilder);

public static IServiceCollection AddEventBus(
this IServiceCollection services,
Assembly[] assemblies,
ServiceLifetime lifetime,
Action<DispatcherOptions>? options = null)
Action<EventBusBuilder>? eventBusBuilder = null)
{
if (services.Any(service => service.ImplementationType == typeof(EventBusProvider)))
return services;

services.AddSingleton<EventBusProvider>();

eventBusBuilder?.Invoke(new EventBusBuilder(services));

DispatcherOptions dispatcherOptions = new DispatcherOptions(services, assemblies);
options?.Invoke(dispatcherOptions);
services.AddSingleton(typeof(IOptions<DispatcherOptions>),
_ => Microsoft.Extensions.Options.Options.Create(dispatcherOptions));

services.AddSingleton(new SagaDispatcher(services, dispatcherOptions.Assemblies).Build(lifetime));
services.AddSingleton(new Internal.Dispatch.Dispatcher(services, dispatcherOptions.Assemblies).Build(lifetime));
services.AddSingleton(new SagaDispatcher(services, assemblies).Build(lifetime));
services.AddSingleton(new Internal.Dispatch.Dispatcher(services, assemblies).Build(lifetime));
services.TryAdd(typeof(IExecutionStrategy), typeof(ExecutionStrategy), ServiceLifetime.Singleton);
services.AddTransient(typeof(IMiddleware<>), typeof(TransactionMiddleware<>));
services.AddScoped(typeof(IEventBus), typeof(EventBus));
Expand All @@ -41,20 +41,20 @@ public static IServiceCollection AddTestEventBus(
this IServiceCollection services,
Assembly[] assemblies,
ServiceLifetime lifetime,
Action<DispatcherOptions>? options = null)
Action<EventBusBuilder>? eventBusBuilder = null)
{
if (services.Any(service => service.ImplementationType == typeof(EventBusProvider)))
return services;

services.AddSingleton<EventBusProvider>();

DispatcherOptions dispatcherOptions = new DispatcherOptions(services, assemblies);
options?.Invoke(dispatcherOptions);
eventBusBuilder?.Invoke(new EventBusBuilder(services));

DispatcherOptions dispatcherOptions = new DispatcherOptions(services, assemblies);
services.AddSingleton(typeof(IOptions<DispatcherOptions>),
serviceProvider => Microsoft.Extensions.Options.Options.Create(dispatcherOptions));
services.AddSingleton(new SagaDispatcher(services, dispatcherOptions.Assemblies, true).Build(lifetime));
services.AddSingleton(new Internal.Dispatch.Dispatcher(services, dispatcherOptions.Assemblies).Build(lifetime));
services.AddSingleton(new SagaDispatcher(services, assemblies, true).Build(lifetime));
services.AddSingleton(new Internal.Dispatch.Dispatcher(services, assemblies).Build(lifetime));
services.TryAdd(typeof(IExecutionStrategy), typeof(ExecutionStrategy), ServiceLifetime.Singleton);
services.AddTransient(typeof(IMiddleware<>), typeof(TransactionMiddleware<>));
services.AddScoped(typeof(IEventBus), typeof(EventBus));
Expand All @@ -64,6 +64,5 @@ public static IServiceCollection AddTestEventBus(

private class EventBusProvider
{

}
}

This file was deleted.

Loading