Skip to content

Commit

Permalink
Add new option SubscriberParallelExecuteThreadCount,SubscriberParalle…
Browse files Browse the repository at this point in the history
…lExecuteBufferFactor to better support parallel execte subscriber. ( #1513)
  • Loading branch information
yang-xiaodong committed Apr 16, 2024
1 parent 31d77b0 commit b4619ee
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 42 deletions.
23 changes: 20 additions & 3 deletions docs/content/user-guide/en/cap/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,16 +144,33 @@ The expiration time (in seconds) of the failed message. When the message is sent
If `true` then all consumers within the same group pushes received messages to own dispatching pipeline channel. Each channel has set thread count to `ConsumerThreadCount` value.

#### EnableConsumerPrefetch
#### [Obsolete] EnableConsumerPrefetch

> Default: false, Before version 7.0 the default behavior is true
By default, CAP will only read one message from the message queue, then execute the subscription method. After the execution is done, it will read the next message for execution.
If set to true, the consumer will prefetch some messages to the memory queue, and then distribute them to the .NET thread pool for execution.
Renamed to `EnableSubscriberParallelExecute` option, Please use the new option.

### EnableSubscriberParallelExecute

> Default: false
If `true`, CAP will prefetch some message from the broker as buffered, then execute the subscriber method. After the execution is done, it will fetch the next batch for execution.

!!! note "Precautions"
Setting it to true may cause some problems. When the subscription method executes too slowly and takes too long, it will cause the retry thread to pick up messages that have not yet been executed. The retry thread picks up messages from 4 minutes (FallbackWindowLookbackSeconds) ago by default , that is to say, if the message backlog of more than 4 minutes (FallbackWindowLookbackSeconds) on the consumer side will be picked up again and executed again

### SubscriberParallelExecuteThreadCount

> Default: `Environment.ProcessorCount`
With the `EnableSubscriberParallelExecute` option enabled, specify the number of parallel task execution threads.

### SubscriberParallelExecuteBufferFactor

> Default: 1
With the `EnableSubscriberParallelExecute` option enabled, multiplier used to determine the buffered capacity size in subscriber parallel execution. The buffer capacity is computed by multiplying this factor with the value of `SubscriberParallelExecuteThreadCount`, which represents the number of threads allocated for parallel processing.

#### EnablePublishParallelSend

> Default: false, The (7.2 <= Version < 8.1) the default behavior is true
Expand Down
23 changes: 20 additions & 3 deletions docs/content/user-guide/zh/cap/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,16 +148,33 @@ services.AddCap(config =>

在同时配合使用 `EnableConsumerPrefetch` 时,请参考 issue [#1399](https://github.com/dotnetcore/CAP/issues/1399) 以清晰其预期行为。

#### EnableConsumerPrefetch
#### [已过时] EnableConsumerPrefetch

> 默认值: false, 在 7.0 版本之前默认行为 true
默认情况下,CAP只会从消息队列读取一条,然后执行订阅方法,执行完成后才会读取下一条来执行.
如果设置为 true, 消费端会将消息预取到内存队列,然后再放入.NET 线程池并行执行。
该配置项已被重命名为 `EnableSubscriberParallelExecute`,请使用新选项。

### EnableSubscriberParallelExecute

> 默认值: false
如果设置为 `true`,CAP将提前从Broker拉取一批消息置于内存缓冲区,然后执行订阅方法;当订阅方法执行完成后,拉取下一批消息至于缓冲区然后执行。

!!! note "注意事项"
设置为 true 可能会产生一些问题,当订阅方法执行过慢耗时太久时,会导致重试线程拾取到还未执行的的消息。重试线程默认拾取4分钟前(FallbackWindowLookbackSeconds 配置项)的消息,也就是说如果消费端积压了超过4分钟(FallbackWindowLookbackSeconds 配置项)的消息就会被重新拾取到再次执行

### SubscriberParallelExecuteThreadCount

> Default: `Environment.ProcessorCount`
当启用 `EnableSubscriberParallelExecute` 时, 可通过此参数执行并行处理的线程数,默认值为处理器个数。

### SubscriberParallelExecuteBufferFactor

> Default: 1
当启用 `EnableSubscriberParallelExecute` 时, 通过此参数设置缓冲区和线程数的因子系数,也就是缓冲区大小等于 `SubscriberParallelExecuteThreadCount``SubscriberParallelExecuteBufferFactor`.

#### EnablePublishParallelSend

> 默认值: false
Expand Down
40 changes: 33 additions & 7 deletions src/DotNetCore.CAP/CAP.Options.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
namespace DotNetCore.CAP;

/// <summary>
/// Represents all the options you can use to configure the system.
/// Provides options to customize various aspects of the message processing pipeline. This includes settings for message expiration,
/// retry mechanisms, concurrency management, and serialization, among others. This class allows fine-tuning
/// CAP's behavior to better align with specific application requirements, such as adjusting threading models for
/// subscriber message processing, setting message expiry times, and customizing serialization settings.
/// </summary>
public class CapOptions
{
Expand All @@ -24,7 +27,9 @@ public CapOptions()
FailedRetryCount = 50;
ConsumerThreadCount = 1;
EnablePublishParallelSend = false;
EnableConsumerPrefetch = false;
EnableSubscriberParallelExecute = false;
SubscriberParallelExecuteThreadCount = Environment.ProcessorCount;
SubscriberParallelExecuteBufferFactor = 1;
Extensions = new List<ICapOptionsExtension>();
Version = "v1";
DefaultGroupName = "cap.queue." + Assembly.GetEntryAssembly()?.GetName().Name!.ToLower();
Expand Down Expand Up @@ -92,20 +97,41 @@ public CapOptions()
public int ConsumerThreadCount { get; set; }

/// <summary>
/// If true, the message will be prefetch to memory queue for parallel execute by .net thread pool.
/// Default is false
/// If true, the message will be buffered to memory queue for parallel execute.
/// <para>The option is obsolete, use EnableSubscriberParallelExecute instead!</para>
/// </summary>
public bool EnableConsumerPrefetch { get; set; }
[Obsolete("Renamed to EnableSubscriberParallelExecute option.")]
public bool EnableConsumerPrefetch { get => EnableSubscriberParallelExecute; set => EnableSubscriberParallelExecute = value; }

/// <summary>
/// If true, the message will be buffered to memory queue for parallel execute;
/// Default is false.
/// <para>Use <see cref="SubscriberParallelExecuteThreadCount"/> to specify the number of parallel threads.</para>
/// </summary>
public bool EnableSubscriberParallelExecute { get; set; }

/// <summary>
/// With the <c>EnableSubscriberParallelExecute</c> option enabled, specify the number of parallel task execution threads.
/// Default is <see cref="Environment.ProcessorCount"/>.
/// </summary>
public int SubscriberParallelExecuteThreadCount { get; set; }

/// <summary>
/// With the <c>EnableSubscriberParallelExecute</c> option enabled, multiplier used to determine the buffered capacity size in subscriber parallel execution.
/// The buffer capacity is computed by multiplying this factor with the value of <c>SubscriberParallelExecuteThreadCount</c>,
/// which represents the number of threads allocated for parallel processing.
/// </summary>
public int SubscriberParallelExecuteBufferFactor { get; set; }

/// <summary>
/// If true, the message send task will be parallel execute by .net thread pool.
/// Default is false
/// Default is false.
/// </summary>
public bool EnablePublishParallelSend { get; set; }

/// <summary>
/// If true then each message group will have own independent dispatching pipeline. Each pipeline use as many threads as
/// <see cref="ConsumerThreadCount" /> value is.
/// <see cref="SubscriberParallelExecuteThreadCount" /> value is.
/// Default is false.
/// </summary>
public bool UseDispatchingPerGroup { get; set; }
Expand Down
23 changes: 12 additions & 11 deletions src/DotNetCore.CAP/Processor/IDispatcher.Default.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class Dispatcher : IDispatcher
private readonly IMessageSender _sender;
private readonly IDataStorage _storage;
private readonly PriorityQueue<MediumMessage, long> _schedulerQueue;
private readonly bool _enablePrefetch;
private readonly bool _enableParallelExecute;
private readonly bool _enableParallelSend;

private Channel<MediumMessage> _publishedChannel = default!;
Expand All @@ -45,7 +45,7 @@ public Dispatcher(ILogger<Dispatcher> logger,
_executor = executor;
_schedulerQueue = new PriorityQueue<MediumMessage, long>();
_storage = storage;
_enablePrefetch = options.Value.EnableConsumerPrefetch;
_enableParallelExecute = options.Value.EnableSubscriberParallelExecute;
_enableParallelSend = options.Value.EnablePublishParallelSend;
}

Expand All @@ -66,20 +66,20 @@ public async Task Start(CancellationToken stoppingToken)

await Task.Run(Sending, _tasksCts.Token).ConfigureAwait(false); //here return valuetask

if (_enablePrefetch)
if (_enableParallelExecute)
{
var capacity = _options.ConsumerThreadCount * 300;
_receivedChannel = Channel.CreateBounded<(MediumMessage, ConsumerExecutorDescriptor?)>(
new BoundedChannelOptions(capacity > 3000 ? 3000 : capacity)
new BoundedChannelOptions(_options.SubscriberParallelExecuteThreadCount * _options.SubscriberParallelExecuteBufferFactor)
{
AllowSynchronousContinuations = true,
SingleReader = _options.ConsumerThreadCount == 1,
SingleReader = _options.SubscriberParallelExecuteThreadCount == 1,
SingleWriter = true,
FullMode = BoundedChannelFullMode.Wait
});

await Task.WhenAll(Enumerable.Range(0, _options.ConsumerThreadCount)
.Select(_ => Task.Run(Processing, _tasksCts.Token)).ToArray()).ConfigureAwait(false);
await Task.WhenAll(Enumerable.Range(0, _options.SubscriberParallelExecuteThreadCount)
.Select(_ => Task.Run(Processing, _tasksCts.Token)).ToArray())
.ConfigureAwait(false);
}
_ = Task.Run(async () =>
{
Expand Down Expand Up @@ -172,7 +172,7 @@ public async ValueTask EnqueueToExecute(MediumMessage message, ConsumerExecutorD
{
if (_tasksCts!.IsCancellationRequested) return;

if (_enablePrefetch)
if (_enableParallelExecute)
{
if (!_receivedChannel.Writer.TryWrite((message, descriptor)))
{
Expand Down Expand Up @@ -246,15 +246,16 @@ private async ValueTask Processing()
{
var item1 = message.Item1;
var item2 = message.Item2;
_ = Task.Run(() => _executor.ExecuteAsync(item1, item2, _tasksCts.Token).ConfigureAwait(false));
await _executor.ExecuteAsync(item1, item2, _tasksCts.Token).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
//expected
}
catch (Exception e)
{
_logger.LogError(e, "An exception occurred when invoking subscriber. MessageId:{MessageId}", message.Item1.DbId);
_logger.LogError(e,
$"An exception occurred when invoke subscriber. MessageId:{message.Item1.DbId}");
}
}
catch (OperationCanceledException)
Expand Down
34 changes: 16 additions & 18 deletions src/DotNetCore.CAP/Processor/IDispatcher.PerGroup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ internal class DispatcherPerGroup : IDispatcher
private readonly IMessageSender _sender;
private readonly IDataStorage _storage;
private readonly PriorityQueue<MediumMessage, DateTime> _schedulerQueue;
private readonly bool _enablePrefetch;
private readonly bool _enableParallelExecute;
private readonly bool _enableParallelSend;

private Channel<MediumMessage> _publishedChannel = default!;
Expand All @@ -47,7 +47,7 @@ public DispatcherPerGroup(ILogger<Dispatcher> logger,
_executor = executor;
_schedulerQueue = new PriorityQueue<MediumMessage, DateTime>();
_storage = storage;
_enablePrefetch = options.Value.EnableConsumerPrefetch;
_enableParallelExecute = options.Value.EnableSubscriberParallelExecute;
_enableParallelSend = options.Value.EnablePublishParallelSend;
}

Expand All @@ -70,7 +70,7 @@ public async Task Start(CancellationToken stoppingToken)

_receivedChannels =
new ConcurrentDictionary<string, Channel<(MediumMessage, ConsumerExecutorDescriptor?)>>(
_options.ConsumerThreadCount, _options.ConsumerThreadCount * 2);
_options.SubscriberParallelExecuteThreadCount, _options.SubscriberParallelExecuteThreadCount * 2);

GetOrCreateReceiverChannel(_options.DefaultGroupName);

Expand Down Expand Up @@ -191,19 +191,25 @@ public void Dispose()
"Creating receiver channel for group {ConsumerGroup} with thread count {ConsumerThreadCount}", group,
_options.ConsumerThreadCount);

var capacity = _options.ConsumerThreadCount * 300;
var channel = Channel.CreateBounded<(MediumMessage, ConsumerExecutorDescriptor?)>(
new BoundedChannelOptions(capacity > 3000 ? 3000 : capacity)
new BoundedChannelOptions(_options.SubscriberParallelExecuteThreadCount * _options.SubscriberParallelExecuteBufferFactor)
{
AllowSynchronousContinuations = true,
SingleReader = _options.ConsumerThreadCount == 1,
SingleReader = _options.SubscriberParallelExecuteThreadCount == 1,
SingleWriter = true,
FullMode = BoundedChannelFullMode.Wait
});

Task.WhenAll(Enumerable.Range(0, _options.ConsumerThreadCount)
.Select(_ => Task.Run(() => Processing(group, channel), _tasksCts!.Token)).ToArray());

if (_enableParallelExecute)
{
Task.WhenAll(Enumerable.Range(0, _options.SubscriberParallelExecuteThreadCount)
.Select(_ => Task.Run(() => Processing(group, channel), _tasksCts!.Token)).ToArray())
.ConfigureAwait(false);
}
else
{
_ = Task.Run(() => Processing(group, channel), _tasksCts!.Token).ConfigureAwait(false);
}
return channel;
});
}
Expand Down Expand Up @@ -253,15 +259,7 @@ private async ValueTask Processing(string group, Channel<(MediumMessage, Consume

var item1 = message.Item1;
var item2 = message.Item2;

if (_enablePrefetch)
{
_ = Task.Run(() => _executor.ExecuteAsync(item1, item2, _tasksCts.Token).ConfigureAwait(false));
}
else
{
await _executor.ExecuteAsync(item1, item2, _tasksCts.Token).ConfigureAwait(false);
}
await _executor.ExecuteAsync(item1, item2, _tasksCts.Token).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
Expand Down

0 comments on commit b4619ee

Please sign in to comment.