Skip to content

Commit

Permalink
Merge pull request #126 from netcorepal/command-lock
Browse files Browse the repository at this point in the history
feat:commandlock
  • Loading branch information
witskeeper authored Jan 7, 2025
2 parents 52362ab + 33ec107 commit eb1ce4e
Show file tree
Hide file tree
Showing 30 changed files with 927 additions and 105 deletions.
67 changes: 67 additions & 0 deletions docs/content/concurrency/command-lock.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# 命令锁
顾名思义,命令锁是为了解决命令并发执行的问题。在一些场景下,我们需要保证某个命令在同一时间只能被一个实例执行,这时候就可以使用命令锁。
本质上命令锁是一种分布式锁,它的实现方式有很多种,我们默认提供了基于Redis来实现的命令锁。

## 注册命令锁

在Program.cs注册`CommandLocks`
```csharp
builder.Services.AddMediatR(cfg =>
cfg.RegisterServicesFromAssemblies(Assembly.GetExecutingAssembly())
.AddCommandLockBehavior() //注册命令锁行为
.AddUnitOfWorkBehaviors()
.AddKnownExceptionValidationBehavior());

builder.Services.AddCommandLocks(typeof(Program).Assembly); //注册所有的命令锁类型
```

注意: 命令锁应该在事务开启前执行,所以需要在`AddUnitOfWorkBehaviors`之前添加`AddCommandLockBehavior`


## 使用命令锁

定义一个命令锁,实现`ICommandLock<TCommand>`接口,其中`TCommand`是命令类型,例如:

```csharp
public record PayOrderCommand(OrderId Id) : ICommand<OrderId>;

public class PayOrderCommandLock : ICommandLock<PayOrderCommand>
{
public Task<CommandLockSettings> GetLockKeysAsync(PayOrderCommand command,
CancellationToken cancellationToken = default)
{
return Task.FromResult(command.Id.ToCommandLockSettings());
}
}
```

其中`command.Id.ToCommandLockSettings()`是将`OrderId`转换为`CommandLockSettings``CommandLockSettings`是命令锁的配置,包含了锁的Key、获取锁之前可以等待的过期时间。

设计上,命令锁与命令是一对一关系,建议将命令锁与命令、命令处理器放在同一个类文件中,便于维护。

## 多key命令锁

命令锁支持多Key机制,即一个命令可以对应多个Key,例如:

```csharp

public class PayOrderCommandLock : ICommandLock<PayOrderCommand>
{
public Task<CommandLockSettings> GetLockKeysAsync(PayOrderCommand command,
CancellationToken cancellationToken = default)
{
var ids = new List<OrderId> { new OrderId(1), new OrderId(2) };
return Task.FromResult(ids.ToCommandLockSettings());
}
}
```

在这个例子中,`PayOrderCommand`对应两个Key,分别是`OrderId(1)``OrderId(2)`

当需要锁定多个Key时,CommandLockSettings会对多个Key进行排序,然后逐个锁定,如果其中一个Key锁定失败,则会释放已经锁定的Key。


## 可重入机制

命令锁实现了可重入机制,即在同一个请求上下文中,相同的Key可以重复获取锁,不会造成死锁。
例如上面示例的命令执行后序的事件处理过程中再次执行携带相同Key的命令锁,不会死锁。
File renamed without changes.
File renamed without changes.
6 changes: 5 additions & 1 deletion docs/mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ nav:
- 强类型ID: domain/strong-typed-id.md
- 领域模型: domain/domain-entity.md
- 领域事件: domain/domain-event.md
- 集成事件转换器: domain/integration-converter.md
# - 值对象: domain/domain-value-object.md
- ID生成器:
- Snowflake: id-generator/snowflake.md
Expand All @@ -65,6 +64,10 @@ nav:
- 仓储: data/repository.md
- UnitOfWork: data/unit-of-work.md
- 事务处理: transaction/transactions.md
- 事件处理:
- 领域事件处理器: events/domain-event-handler.md
- 集成事件处理器: events/integration-event-handler.md
- 集成事件转换器: events/integration-converter.md
- 服务发现:
- 服务发现客户端: service-discovery/service-discovery-client.md
- Kubernetes: service-discovery/k8s-service-discovery-provider.md
Expand All @@ -75,6 +78,7 @@ nav:
- 并发控制:
- 乐观锁-行版本号: concurrency/row-version.md
- 悲观锁-Redis锁: concurrency/redis-lock.md
- 命令锁: concurrency/command-lock.md
- 多环境:
- 环境上下文: env/env-context.md
- 环境服务选择器: env/env-service-selector.md
Expand Down
7 changes: 7 additions & 0 deletions netcorepal-cloud-framework.sln
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NetCorePal.Extensions.Dto",
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NetCorePal.Extensions.Dto.Tests", "test\NetCorePal.Extensions.Dto.Tests\NetCorePal.Extensions.Dto.Tests.csproj", "{57FCB1FF-0A79-4172-B208-4AD7FC04EF26}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NetCorePal.Extensions.Primitives.UnitTests", "test\NetCorePal.Extensions.Primitives.UnitTests\NetCorePal.Extensions.Primitives.UnitTests.csproj", "{F4C18BB0-F078-40AA-B09F-F2F4A06C2785}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -359,6 +361,10 @@ Global
{57FCB1FF-0A79-4172-B208-4AD7FC04EF26}.Debug|Any CPU.Build.0 = Debug|Any CPU
{57FCB1FF-0A79-4172-B208-4AD7FC04EF26}.Release|Any CPU.ActiveCfg = Release|Any CPU
{57FCB1FF-0A79-4172-B208-4AD7FC04EF26}.Release|Any CPU.Build.0 = Release|Any CPU
{F4C18BB0-F078-40AA-B09F-F2F4A06C2785}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{F4C18BB0-F078-40AA-B09F-F2F4A06C2785}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F4C18BB0-F078-40AA-B09F-F2F4A06C2785}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F4C18BB0-F078-40AA-B09F-F2F4A06C2785}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -422,6 +428,7 @@ Global
{59BB1378-ABEE-4A1E-BF14-7C807F83C8E3} = {605436C1-2560-47BA-99D4-7A80B628230F}
{02772177-21D1-4CF8-8F13-FBE4F5DE0F22} = {605436C1-2560-47BA-99D4-7A80B628230F}
{57FCB1FF-0A79-4172-B208-4AD7FC04EF26} = {DB5A8331-2B57-4F2B-8F7D-02E167BCD8BB}
{F4C18BB0-F078-40AA-B09F-F2F4A06C2785} = {DB5A8331-2B57-4F2B-8F7D-02E167BCD8BB}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {D46201E5-1E3C-47A0-8584-4ACCC135CF9A}
Expand Down
103 changes: 103 additions & 0 deletions src/AspNetCore/CommandLocks/CommandLockBehavior.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
using MediatR;
using NetCorePal.Extensions.DistributedLocks;
using NetCorePal.Extensions.Primitives;

namespace NetCorePal.Extensions.AspNetCore.CommandLocks;

public class CommandLockBehavior<TRequest, TResponse>(
IEnumerable<ICommandLock<TRequest>> commandLocks,
IDistributedLock distributedLock)
: IPipelineBehavior<TRequest, TResponse>
where TRequest : IBaseCommand
{
#pragma warning disable S3604
private readonly CommandLockedKeysHolder _lockedKeys = new();
#pragma warning restore S3604

public async Task<TResponse> Handle(TRequest request, RequestHandlerDelegate<TResponse> next,
CancellationToken cancellationToken)
{
var count = commandLocks.Count();
if (count == 0)
{
return await next();
}

if (count > 1)
{
throw new InvalidOperationException("Only one ICommandLock<TRequest> is allowed");
}

var commandLock = commandLocks.First();
var options = await commandLock.GetLockKeysAsync(request, cancellationToken);
if (!string.IsNullOrEmpty(options.LockKey))
{
if (!_lockedKeys.LockedKeys.Keys.Contains(options.LockKey))
{
await using var lockHandler =
await TryAcquireAsync(options.LockKey, timeout: options.AcquireTimeout,
cancellationToken: cancellationToken);
if (lockHandler == null)
{
throw new CommandLockFailedException("Acquire Lock Faild", options.LockKey);
}

_lockedKeys.LockedKeys.Keys.Add(options.LockKey);
// 确保在执行next后,释放锁
return await next();
}
else
{
return await next();
}
}
else
{
return await LockAndRelease(options, 0, next, cancellationToken);
}
}


private async Task<TResponse> LockAndRelease(CommandLockSettings settings, int lockIndex,
RequestHandlerDelegate<TResponse> next, CancellationToken cancellationToken)
{
if (lockIndex >= settings.LockKeys!.Count)
{
return await next();
}

var key = settings.LockKeys[lockIndex];

if (!_lockedKeys.LockedKeys.Keys.Contains(key))
{
await using var lockHandler =
await TryAcquireAsync(key, timeout: settings.AcquireTimeout,
cancellationToken: cancellationToken);
if (lockHandler == null)
{
throw new CommandLockFailedException("Acquire Lock Faild", key);
}

_lockedKeys.LockedKeys.Keys.Add(key);
// 确保在执行LockAndRelease后,释放锁
return await LockAndRelease(settings, lockIndex + 1, next, cancellationToken);
}
else
{
return await LockAndRelease(settings, lockIndex + 1, next, cancellationToken);
}
}

private async Task<LockSynchronizationHandlerWarpper?> TryAcquireAsync(string key, TimeSpan timeout,
CancellationToken cancellationToken)
{
var handler = await distributedLock.TryAcquireAsync(key, timeout: timeout,
cancellationToken: cancellationToken);
if (handler == null)
{
return null;
}

return new LockSynchronizationHandlerWarpper(key, _lockedKeys.LockedKeys.Keys, handler);
}
}
14 changes: 14 additions & 0 deletions src/AspNetCore/CommandLocks/CommandLockFailedException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
namespace NetCorePal.Extensions.AspNetCore.CommandLocks;

/// <inheritdoc />
#pragma warning disable S3925
public sealed class CommandLockFailedException : Exception
#pragma warning restore S3925
{
public CommandLockFailedException(string message, string failedKey) : base(message)
{
FailedKey = failedKey;
}

public string FailedKey { get; private set; }
}
13 changes: 13 additions & 0 deletions src/AspNetCore/CommandLocks/CommandLockedKeysHolder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
namespace NetCorePal.Extensions.AspNetCore.CommandLocks;

class CommandLockedKeysHolder
{
private static readonly AsyncLocal<KeysHolder> KeysHolderCurrent = new AsyncLocal<KeysHolder>();

public KeysHolder LockedKeys => KeysHolderCurrent.Value ??= new KeysHolder();
}

class KeysHolder
{
public HashSet<string> Keys { get; set; } = new HashSet<string>();
}
19 changes: 19 additions & 0 deletions src/AspNetCore/CommandLocks/LockSynchronizationHandlerWarpper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using NetCorePal.Extensions.DistributedLocks;

namespace NetCorePal.Extensions.AspNetCore.CommandLocks;

/// <summary>
/// 在释放锁后,从持有者集合中移除key
/// </summary>
/// <param name="key"></param>
/// <param name="holder"></param>
/// <param name="handler"></param>
class LockSynchronizationHandlerWarpper(string key, HashSet<string> holder, ILockSynchronizationHandler handler)
: IAsyncDisposable
{
public async ValueTask DisposeAsync()
{
await handler.DisposeAsync();
holder.Remove(key);
}
}
23 changes: 23 additions & 0 deletions src/AspNetCore/EntityIdExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
using NetCorePal.Extensions.Domain;
using NetCorePal.Extensions.Primitives;

namespace NetCorePal.Extensions.Primitives;

public static class EntityIdExtensions
{
public static CommandLockSettings ToCommandLockSettings<TId>(this TId id,
int acquireSeconds = 10)
where TId : IEntityId
{
return new CommandLockSettings(typeof(TId).Name + "-" + id.ToString()!, acquireSeconds: acquireSeconds);
}

public static CommandLockSettings ToCommandLockSettings<TId>(this IEnumerable<TId> ids,
int acquireSeconds = 10)
where TId : IEntityId
{
var typeName = typeof(TId).Name;
return new CommandLockSettings(ids.Select(id => typeName + "-" + id.ToString()),
acquireSeconds: acquireSeconds);
}
}
14 changes: 14 additions & 0 deletions src/AspNetCore/NetCorePal.Extensions.AspNetCore.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,23 @@
<PackageReference Include="Microsoft.EntityFrameworkCore" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\DistributedLocks.Abstractions\NetCorePal.Extensions.DistributedLocks.Abstractions.csproj" />
<ProjectReference Include="..\Domain.Abstractions\NetCorePal.Extensions.Domain.Abstractions.csproj" />
<ProjectReference Include="..\NetCorePal.Extensions.Dto\NetCorePal.Extensions.Dto.csproj" />
<ProjectReference Include="..\NewtonsoftJson\NetCorePal.Extensions.NewtonsoftJson.csproj" />
<ProjectReference Include="..\Primitives\NetCorePal.Extensions.Primitives.csproj" />
</ItemGroup>
<ItemGroup>
<EmbeddedResource Update="R.resx">
<Generator>ResXFileCodeGenerator</Generator>
<LastGenOutput>R.Designer.cs</LastGenOutput>
</EmbeddedResource>
</ItemGroup>
<ItemGroup>
<Compile Update="R.Designer.cs">
<DesignTime>True</DesignTime>
<AutoGen>True</AutoGen>
<DependentUpon>R.resx</DependentUpon>
</Compile>
</ItemGroup>
</Project>
42 changes: 42 additions & 0 deletions src/AspNetCore/ServiceCollectionExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.DependencyInjection;
using NetCorePal.Extensions.AspNetCore;
using NetCorePal.Extensions.AspNetCore.CommandLocks;
using NetCorePal.Extensions.AspNetCore.Validation;
using NetCorePal.Extensions.Domain.Json;
using NetCorePal.Extensions.Dto;
Expand Down Expand Up @@ -47,6 +48,17 @@ public static MediatRServiceConfiguration AddKnownExceptionValidationBehavior(
return cfg;
}

/// <summary>
/// 添加CommandLockBehavior,以支持命令锁
/// </summary>
/// <param name="cfg"></param>
/// <returns></returns>
public static MediatRServiceConfiguration AddCommandLockBehavior(this MediatRServiceConfiguration cfg)
{
cfg.AddOpenBehavior(typeof(CommandLockBehavior<,>));
return cfg;
}


/// <summary>
/// 将所有实现IQuery接口的类注册为查询类,添加到容器中
Expand Down Expand Up @@ -131,4 +143,34 @@ public static IMvcBuilder AddEntityIdSystemTextJson(this IMvcBuilder builder)
});
return builder;
}


/// <summary>
/// 添加CommandLocks
/// </summary>
/// <param name="services"></param>
/// <param name="assemblies"></param>
/// <returns></returns>
public static IServiceCollection AddCommandLocks(this IServiceCollection services, params Assembly[] assemblies)
{
foreach (var assembly in assemblies)
{
var types = assembly.GetTypes();

foreach (var type in types.Where(p => p is { IsClass: true, IsAbstract: false, IsGenericType: false }))
{
var interfaces = type.GetInterfaces();
foreach (var @interface in interfaces)
{
if (@interface.IsGenericType &&
@interface.GetGenericTypeDefinition() == typeof(ICommandLock<>))
{
services.AddTransient(@interface, type);
}
}
}
}

return services;
}
}
Loading

0 comments on commit eb1ce4e

Please sign in to comment.