From 1cadd3ab86bf8518f65f537ba2422525dd2c5f4a Mon Sep 17 00:00:00 2001 From: Saman Azadi <60857846+samanazadi1996@users.noreply.github.com> Date: Fri, 20 Sep 2024 17:51:13 +0330 Subject: [PATCH] Refactor to file-scoped namespaces (#1586) Modified : src/DotNetCore.CAP.NATS/CAP.NATSCapOptionsExtension.cs Modified : src/DotNetCore.CAP.NATS/CAP.NATSOptions.cs Modified : src/DotNetCore.CAP.NATS/CAP.Options.Extensions.cs Modified : src/DotNetCore.CAP.NATS/IConnectionPool.Default.cs Modified : src/DotNetCore.CAP.NATS/IConnectionPool.cs Modified : src/DotNetCore.CAP.NATS/ITransport.NATS.cs Modified : src/DotNetCore.CAP.NATS/NATSConsumerClient.cs Modified : src/DotNetCore.CAP.NATS/NATSConsumerClientFactory.cs Modified : src/DotNetCore.CAP.RabbitMQ/RabbitMQBasicConsumer.cs Modified : src/DotNetCore.CAP/Diagnostics/EventCounterSource.Cap.cs Modified : src/DotNetCore.CAP/Internal/ISnowflakeId.cs --- .../CAP.NATSCapOptionsExtension.cs | 33 +- src/DotNetCore.CAP.NATS/CAP.NATSOptions.cs | 73 ++-- .../CAP.Options.Extensions.cs | 57 ++- .../IConnectionPool.Default.cs | 107 +++--- src/DotNetCore.CAP.NATS/IConnectionPool.cs | 15 +- src/DotNetCore.CAP.NATS/ITransport.NATS.cs | 81 ++--- src/DotNetCore.CAP.NATS/NATSConsumerClient.cs | 341 +++++++++--------- .../NATSConsumerClientFactory.cs | 39 +- .../RabbitMQBasicConsumer.cs | 217 ++++++----- .../Diagnostics/EventCounterSource.Cap.cs | 129 ++++--- src/DotNetCore.CAP/Internal/ISnowflakeId.cs | 11 +- 11 files changed, 546 insertions(+), 557 deletions(-) diff --git a/src/DotNetCore.CAP.NATS/CAP.NATSCapOptionsExtension.cs b/src/DotNetCore.CAP.NATS/CAP.NATSCapOptionsExtension.cs index a7cdf1252..9ed695be5 100644 --- a/src/DotNetCore.CAP.NATS/CAP.NATSCapOptionsExtension.cs +++ b/src/DotNetCore.CAP.NATS/CAP.NATSCapOptionsExtension.cs @@ -1,4 +1,4 @@ -// Copyright (c) .NET Core Community. All rights reserved. +// Copyright (c) .NET Core Community. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. using System; @@ -7,26 +7,25 @@ using Microsoft.Extensions.DependencyInjection; // ReSharper disable once CheckNamespace -namespace DotNetCore.CAP +namespace DotNetCore.CAP; + +internal sealed class NATSCapOptionsExtension : ICapOptionsExtension { - internal sealed class NATSCapOptionsExtension : ICapOptionsExtension - { - private readonly Action _configure; + private readonly Action _configure; - public NATSCapOptionsExtension(Action configure) - { - _configure = configure; - } + public NATSCapOptionsExtension(Action configure) + { + _configure = configure; + } - public void AddServices(IServiceCollection services) - { - services.AddSingleton(new CapMessageQueueMakerService("NATS JetStream")); + public void AddServices(IServiceCollection services) + { + services.AddSingleton(new CapMessageQueueMakerService("NATS JetStream")); - services.Configure(_configure); + services.Configure(_configure); - services.AddSingleton(); - services.AddSingleton(); - services.AddSingleton(); - } + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.NATS/CAP.NATSOptions.cs b/src/DotNetCore.CAP.NATS/CAP.NATSOptions.cs index b9482e4f4..c2651d202 100644 --- a/src/DotNetCore.CAP.NATS/CAP.NATSOptions.cs +++ b/src/DotNetCore.CAP.NATS/CAP.NATSOptions.cs @@ -1,4 +1,4 @@ -// Copyright (c) .NET Core Community. All rights reserved. +// Copyright (c) .NET Core Community. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. using System; @@ -7,43 +7,42 @@ using NATS.Client.JetStream; // ReSharper disable once CheckNamespace -namespace DotNetCore.CAP +namespace DotNetCore.CAP; + +/// +/// Provides programmatic configuration for the CAP NATS project. +/// +public class NATSOptions { /// - /// Provides programmatic configuration for the CAP NATS project. + /// Gets or sets the server url/urls used to connect to the NATs server. + /// + /// This may contain username/password information. + public string Servers { get; set; } = "nats://127.0.0.1:4222"; + + /// + /// connection pool size, default is 10 + /// + public int ConnectionPoolSize { get; set; } = 10; + + /// + /// Allows a nats consumer client to dynamically create a stream and configure the expected subjects on the stream. Defaults to true. + /// + public bool EnableSubscriberClientStreamAndSubjectCreation { get; set; } = true; + + /// + /// Used to setup all NATs client options + /// + public Options? Options { get; set; } + + public Action? StreamOptions { get; set; } + + public Action? ConsumerOptions { get; set; } + + /// + /// If you need to get additional native delivery args, you can use this function to write into . /// - public class NATSOptions - { - /// - /// Gets or sets the server url/urls used to connect to the NATs server. - /// - /// This may contain username/password information. - public string Servers { get; set; } = "nats://127.0.0.1:4222"; - - /// - /// connection pool size, default is 10 - /// - public int ConnectionPoolSize { get; set; } = 10; - - /// - /// Allows a nats consumer client to dynamically create a stream and configure the expected subjects on the stream. Defaults to true. - /// - public bool EnableSubscriberClientStreamAndSubjectCreation { get; set; } = true; - - /// - /// Used to setup all NATs client options - /// - public Options? Options { get; set; } - - public Action? StreamOptions { get; set; } - - public Action? ConsumerOptions { get; set; } - - /// - /// If you need to get additional native delivery args, you can use this function to write into . - /// - public Func>>? CustomHeadersBuilder { get; set; } - - public Func NormalizeStreamName { get; set; } = origin => origin.Split('.')[0]; - } + public Func>>? CustomHeadersBuilder { get; set; } + + public Func NormalizeStreamName { get; set; } = origin => origin.Split('.')[0]; } \ No newline at end of file diff --git a/src/DotNetCore.CAP.NATS/CAP.Options.Extensions.cs b/src/DotNetCore.CAP.NATS/CAP.Options.Extensions.cs index c6be74b24..1068f7ab3 100644 --- a/src/DotNetCore.CAP.NATS/CAP.Options.Extensions.cs +++ b/src/DotNetCore.CAP.NATS/CAP.Options.Extensions.cs @@ -1,44 +1,43 @@ -// Copyright (c) .NET Core Community. All rights reserved. +// Copyright (c) .NET Core Community. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. using System; using DotNetCore.CAP; // ReSharper disable once CheckNamespace -namespace Microsoft.Extensions.DependencyInjection +namespace Microsoft.Extensions.DependencyInjection; + +public static class CapOptionsExtensions { - public static class CapOptionsExtensions + /// + /// Configuration to use NATS in CAP. + /// + /// CAP configuration options + /// NATS bootstrap server urls. + public static CapOptions UseNATS(this CapOptions options, string? bootstrapServers = null) { - /// - /// Configuration to use NATS in CAP. - /// - /// CAP configuration options - /// NATS bootstrap server urls. - public static CapOptions UseNATS(this CapOptions options, string? bootstrapServers = null) + return options.UseNATS(opt => { - return options.UseNATS(opt => - { - if (bootstrapServers != null) - opt.Servers = bootstrapServers; - }); - } + if (bootstrapServers != null) + opt.Servers = bootstrapServers; + }); + } - /// - /// Configuration to use NATS in CAP. - /// - /// CAP configuration options - /// Provides programmatic configuration for the NATS. - /// - public static CapOptions UseNATS(this CapOptions options, Action configure) + /// + /// Configuration to use NATS in CAP. + /// + /// CAP configuration options + /// Provides programmatic configuration for the NATS. + /// + public static CapOptions UseNATS(this CapOptions options, Action configure) + { + if (configure == null) { - if (configure == null) - { - throw new ArgumentNullException(nameof(configure)); - } + throw new ArgumentNullException(nameof(configure)); + } - options.RegisterExtension(new NATSCapOptionsExtension(configure)); + options.RegisterExtension(new NATSCapOptionsExtension(configure)); - return options; - } + return options; } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.NATS/IConnectionPool.Default.cs b/src/DotNetCore.CAP.NATS/IConnectionPool.Default.cs index 361b959a7..0af8a8720 100644 --- a/src/DotNetCore.CAP.NATS/IConnectionPool.Default.cs +++ b/src/DotNetCore.CAP.NATS/IConnectionPool.Default.cs @@ -1,4 +1,4 @@ -// Copyright (c) .NET Core Community. All rights reserved. +// Copyright (c) .NET Core Community. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. using System; @@ -8,78 +8,77 @@ using Microsoft.Extensions.Options; using NATS.Client; -namespace DotNetCore.CAP.NATS +namespace DotNetCore.CAP.NATS; + +public class ConnectionPool : IConnectionPool, IDisposable { - public class ConnectionPool : IConnectionPool, IDisposable - { - private readonly NATSOptions _options; - private readonly ConcurrentQueue _connectionPool; + private readonly NATSOptions _options; + private readonly ConcurrentQueue _connectionPool; - private readonly ConnectionFactory _connectionFactory; - private int _pCount; - private int _maxSize; + private readonly ConnectionFactory _connectionFactory; + private int _pCount; + private int _maxSize; - public ConnectionPool(ILogger logger, IOptions options) - { - _options = options.Value; - _connectionPool = new ConcurrentQueue(); - _connectionFactory = new ConnectionFactory(); - _maxSize = _options.ConnectionPoolSize; + public ConnectionPool(ILogger logger, IOptions options) + { + _options = options.Value; + _connectionPool = new ConcurrentQueue(); + _connectionFactory = new ConnectionFactory(); + _maxSize = _options.ConnectionPoolSize; - logger.LogDebug("NATS configuration: {0}", options.Value.Options); - } + logger.LogDebug("NATS configuration: {0}", options.Value.Options); + } - public string ServersAddress => _options.Servers; + public string ServersAddress => _options.Servers; - public IConnection RentConnection() + public IConnection RentConnection() + { + if (_connectionPool.TryDequeue(out var connection)) { - if (_connectionPool.TryDequeue(out var connection)) - { - Interlocked.Decrement(ref _pCount); - - return connection; - } - - if (_options.Options != null) - { - _options.Options.Url = _options.Servers; - connection = _connectionFactory.CreateConnection(_options.Options); - } - else - { - connection = _connectionFactory.CreateConnection(_options.Servers); - } + Interlocked.Decrement(ref _pCount); return connection; } - public bool Return(IConnection connection) + if (_options.Options != null) { - if (Interlocked.Increment(ref _pCount) <= _maxSize && connection.State == ConnState.CONNECTED) - { - _connectionPool.Enqueue(connection); - - return true; - } + _options.Options.Url = _options.Servers; + connection = _connectionFactory.CreateConnection(_options.Options); + } + else + { + connection = _connectionFactory.CreateConnection(_options.Servers); + } - if (!connection.IsReconnecting()) - { - connection.Dispose(); - } + return connection; + } - Interlocked.Decrement(ref _pCount); + public bool Return(IConnection connection) + { + if (Interlocked.Increment(ref _pCount) <= _maxSize && connection.State == ConnState.CONNECTED) + { + _connectionPool.Enqueue(connection); - return false; + return true; } - public void Dispose() + if (!connection.IsReconnecting()) { - _maxSize = 0; + connection.Dispose(); + } + + Interlocked.Decrement(ref _pCount); - while (_connectionPool.TryDequeue(out var context)) - { - context.Dispose(); - } + return false; + } + + public void Dispose() + { + _maxSize = 0; + + while (_connectionPool.TryDequeue(out var context)) + { + context.Dispose(); } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.NATS/IConnectionPool.cs b/src/DotNetCore.CAP.NATS/IConnectionPool.cs index e6d78e6f8..b43004527 100644 --- a/src/DotNetCore.CAP.NATS/IConnectionPool.cs +++ b/src/DotNetCore.CAP.NATS/IConnectionPool.cs @@ -1,16 +1,15 @@ -// Copyright (c) .NET Core Community. All rights reserved. +// Copyright (c) .NET Core Community. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. using NATS.Client; -namespace DotNetCore.CAP.NATS +namespace DotNetCore.CAP.NATS; + +public interface IConnectionPool { - public interface IConnectionPool - { - string ServersAddress { get; } + string ServersAddress { get; } - IConnection RentConnection(); + IConnection RentConnection(); - bool Return(IConnection connection); - } + bool Return(IConnection connection); } \ No newline at end of file diff --git a/src/DotNetCore.CAP.NATS/ITransport.NATS.cs b/src/DotNetCore.CAP.NATS/ITransport.NATS.cs index cdb07c63a..1049989e4 100644 --- a/src/DotNetCore.CAP.NATS/ITransport.NATS.cs +++ b/src/DotNetCore.CAP.NATS/ITransport.NATS.cs @@ -1,4 +1,4 @@ -// Copyright (c) .NET Core Community. All rights reserved. +// Copyright (c) .NET Core Community. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. using System; @@ -10,61 +10,60 @@ using NATS.Client; using NATS.Client.JetStream; -namespace DotNetCore.CAP.NATS +namespace DotNetCore.CAP.NATS; + +internal class NATSTransport : ITransport { - internal class NATSTransport : ITransport + private readonly IConnectionPool _connectionPool; + private readonly ILogger _logger; + private readonly JetStreamOptions _jetStreamOptions; + + public NATSTransport(ILogger logger, IConnectionPool connectionPool) { - private readonly IConnectionPool _connectionPool; - private readonly ILogger _logger; - private readonly JetStreamOptions _jetStreamOptions; + _logger = logger; + _connectionPool = connectionPool; - public NATSTransport(ILogger logger, IConnectionPool connectionPool) - { - _logger = logger; - _connectionPool = connectionPool; + _jetStreamOptions = JetStreamOptions.Builder().WithPublishNoAck(false).WithRequestTimeout(3000).Build(); + } - _jetStreamOptions = JetStreamOptions.Builder().WithPublishNoAck(false).WithRequestTimeout(3000).Build(); - } + public BrokerAddress BrokerAddress => new BrokerAddress("NATS", _connectionPool.ServersAddress); - public BrokerAddress BrokerAddress => new BrokerAddress("NATS", _connectionPool.ServersAddress); + public async Task SendAsync(TransportMessage message) + { + var connection = _connectionPool.RentConnection(); - public async Task SendAsync(TransportMessage message) + try { - var connection = _connectionPool.RentConnection(); - - try + var msg = new Msg(message.GetName(), message.Body.ToArray()); + foreach (var header in message.Headers) { - var msg = new Msg(message.GetName(), message.Body.ToArray()); - foreach (var header in message.Headers) - { - msg.Header[header.Key] = header.Value; - } - - var js = connection.CreateJetStreamContext(_jetStreamOptions); + msg.Header[header.Key] = header.Value; + } - var builder = PublishOptions.Builder().WithMessageId(message.GetId()); + var js = connection.CreateJetStreamContext(_jetStreamOptions); - var resp = await js.PublishAsync(msg, builder.Build()); + var builder = PublishOptions.Builder().WithMessageId(message.GetId()); - if (resp.Seq > 0) - { - _logger.LogDebug($"NATS stream message [{message.GetName()}] has been published."); + var resp = await js.PublishAsync(msg, builder.Build()); - return OperateResult.Success; - } - - throw new PublisherSentFailedException("NATS message send failed, no consumer reply!"); - } - catch (Exception ex) + if (resp.Seq > 0) { - var warpEx = new PublisherSentFailedException(ex.Message, ex); + _logger.LogDebug($"NATS stream message [{message.GetName()}] has been published."); - return OperateResult.Failed(warpEx); - } - finally - { - _connectionPool.Return(connection); + return OperateResult.Success; } + + throw new PublisherSentFailedException("NATS message send failed, no consumer reply!"); + } + catch (Exception ex) + { + var warpEx = new PublisherSentFailedException(ex.Message, ex); + + return OperateResult.Failed(warpEx); + } + finally + { + _connectionPool.Return(connection); } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.NATS/NATSConsumerClient.cs b/src/DotNetCore.CAP.NATS/NATSConsumerClient.cs index f8f2dc31f..0e0216d66 100644 --- a/src/DotNetCore.CAP.NATS/NATSConsumerClient.cs +++ b/src/DotNetCore.CAP.NATS/NATSConsumerClient.cs @@ -1,4 +1,4 @@ -// Copyright (c) .NET Core Community. All rights reserved. +// Copyright (c) .NET Core Community. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. using System; @@ -13,241 +13,240 @@ using NATS.Client; using NATS.Client.JetStream; -namespace DotNetCore.CAP.NATS +namespace DotNetCore.CAP.NATS; + +internal sealed class NATSConsumerClient : IConsumerClient { - internal sealed class NATSConsumerClient : IConsumerClient - { - private static readonly object ConnectionLock = new(); + private static readonly object ConnectionLock = new(); - private readonly string _groupName; - private readonly byte _groupConcurrent; - private readonly IServiceProvider _serviceProvider; - private readonly NATSOptions _natsOptions; - private readonly SemaphoreSlim _semaphore; - private IConnection? _consumerClient; + private readonly string _groupName; + private readonly byte _groupConcurrent; + private readonly IServiceProvider _serviceProvider; + private readonly NATSOptions _natsOptions; + private readonly SemaphoreSlim _semaphore; + private IConnection? _consumerClient; - public NATSConsumerClient(string groupName, byte groupConcurrent, IOptions options, IServiceProvider serviceProvider) - { - _groupName = groupName; - _groupConcurrent = groupConcurrent; - _serviceProvider = serviceProvider; - _semaphore = new SemaphoreSlim(groupConcurrent); - _natsOptions = options.Value ?? throw new ArgumentNullException(nameof(options)); - } + public NATSConsumerClient(string groupName, byte groupConcurrent, IOptions options, IServiceProvider serviceProvider) + { + _groupName = groupName; + _groupConcurrent = groupConcurrent; + _serviceProvider = serviceProvider; + _semaphore = new SemaphoreSlim(groupConcurrent); + _natsOptions = options.Value ?? throw new ArgumentNullException(nameof(options)); + } - public Func? OnMessageCallback { get; set; } + public Func? OnMessageCallback { get; set; } - public Action? OnLogCallback { get; set; } + public Action? OnLogCallback { get; set; } - public BrokerAddress BrokerAddress => new("NATS", _natsOptions.Servers); + public BrokerAddress BrokerAddress => new("NATS", _natsOptions.Servers); - public ICollection FetchTopics(IEnumerable topicNames) + public ICollection FetchTopics(IEnumerable topicNames) + { + if (_natsOptions.EnableSubscriberClientStreamAndSubjectCreation) { - if (_natsOptions.EnableSubscriberClientStreamAndSubjectCreation) - { - Connect(); + Connect(); - var jsm = _consumerClient!.CreateJetStreamManagementContext(); + var jsm = _consumerClient!.CreateJetStreamManagementContext(); - var streamSubjectsGroups = topicNames.GroupBy(x => _natsOptions.NormalizeStreamName(x)); + var streamSubjectsGroups = topicNames.GroupBy(x => _natsOptions.NormalizeStreamName(x)); - foreach (var streamSubjectsGroup in streamSubjectsGroups) - { - var builder = StreamConfiguration.Builder() - .WithName(streamSubjectsGroup.Key) - .WithNoAck(false) - .WithStorageType(StorageType.Memory) - .WithSubjects(streamSubjectsGroup.ToList()); + foreach (var streamSubjectsGroup in streamSubjectsGroups) + { + var builder = StreamConfiguration.Builder() + .WithName(streamSubjectsGroup.Key) + .WithNoAck(false) + .WithStorageType(StorageType.Memory) + .WithSubjects(streamSubjectsGroup.ToList()); - _natsOptions.StreamOptions?.Invoke(builder); + _natsOptions.StreamOptions?.Invoke(builder); + try + { + jsm.GetStreamInfo(streamSubjectsGroup.Key); // this throws if the stream does not exist + + jsm.UpdateStream(builder.Build()); + } + catch (NATSJetStreamException) + { try { - jsm.GetStreamInfo(streamSubjectsGroup.Key); // this throws if the stream does not exist - - jsm.UpdateStream(builder.Build()); + jsm.AddStream(builder.Build()); } - catch (NATSJetStreamException) + catch { - try - { - jsm.AddStream(builder.Build()); - } - catch - { - // ignored - } + // ignored } } } - - return topicNames.ToList(); } - public void Subscribe(IEnumerable topics) + return topicNames.ToList(); + } + + public void Subscribe(IEnumerable topics) + { + if (topics == null) { - if (topics == null) - { - throw new ArgumentNullException(nameof(topics)); - } + throw new ArgumentNullException(nameof(topics)); + } - Connect(); + Connect(); - var js = _consumerClient!.CreateJetStreamContext(); - var streamGroup = topics.GroupBy(x => _natsOptions.NormalizeStreamName(x)); + var js = _consumerClient!.CreateJetStreamContext(); + var streamGroup = topics.GroupBy(x => _natsOptions.NormalizeStreamName(x)); - lock (ConnectionLock) + lock (ConnectionLock) + { + foreach (var subjectStream in streamGroup) { - foreach (var subjectStream in streamGroup) + var groupName = Helper.Normalized(_groupName); + + foreach (var subject in subjectStream) { - var groupName = Helper.Normalized(_groupName); + try + { + var consumerConfig = ConsumerConfiguration.Builder() + .WithDurable(Helper.Normalized(groupName + "-" + subject)) + .WithDeliverPolicy(DeliverPolicy.New) + .WithAckWait(30000) + .WithAckPolicy(AckPolicy.Explicit); + + _natsOptions.ConsumerOptions?.Invoke(consumerConfig); - foreach (var subject in subjectStream) + var pso = PushSubscribeOptions.Builder() + .WithStream(subjectStream.Key) + .WithConfiguration(consumerConfig.Build()) + .Build(); + + js.PushSubscribeAsync(subject, groupName, SubscriptionMessageHandler, false, pso); + } + catch (Exception e) { - try + OnLogCallback!(new LogMessageEventArgs() { - var consumerConfig = ConsumerConfiguration.Builder() - .WithDurable(Helper.Normalized(groupName + "-" + subject)) - .WithDeliverPolicy(DeliverPolicy.New) - .WithAckWait(30000) - .WithAckPolicy(AckPolicy.Explicit); - - _natsOptions.ConsumerOptions?.Invoke(consumerConfig); - - var pso = PushSubscribeOptions.Builder() - .WithStream(subjectStream.Key) - .WithConfiguration(consumerConfig.Build()) - .Build(); - - js.PushSubscribeAsync(subject, groupName, SubscriptionMessageHandler, false, pso); - } - catch (Exception e) - { - OnLogCallback!(new LogMessageEventArgs() - { - LogType = MqLogType.ConnectError, - Reason = $"An error was encountered when attempting to subscribe to subject: {subject}.{Environment.NewLine}" + - $"{e.Message}" - }); - } + LogType = MqLogType.ConnectError, + Reason = $"An error was encountered when attempting to subscribe to subject: {subject}.{Environment.NewLine}" + + $"{e.Message}" + }); } } } } + } - public void Listening(TimeSpan timeout, CancellationToken cancellationToken) + public void Listening(TimeSpan timeout, CancellationToken cancellationToken) + { + while (true) { - while (true) - { - cancellationToken.ThrowIfCancellationRequested(); - cancellationToken.WaitHandle.WaitOne(timeout); - } - // ReSharper disable once FunctionNeverReturns + cancellationToken.ThrowIfCancellationRequested(); + cancellationToken.WaitHandle.WaitOne(timeout); } + // ReSharper disable once FunctionNeverReturns + } - private void SubscriptionMessageHandler(object? sender, MsgHandlerEventArgs e) + private void SubscriptionMessageHandler(object? sender, MsgHandlerEventArgs e) + { + if (_groupConcurrent > 0) { - if (_groupConcurrent > 0) - { - _semaphore.Wait(); - Task.Run(() => Consume()).ConfigureAwait(false); - } - else - { - Consume().GetAwaiter().GetResult(); - } + _semaphore.Wait(); + Task.Run(() => Consume()).ConfigureAwait(false); + } + else + { + Consume().GetAwaiter().GetResult(); + } - Task Consume() - { - var headers = new Dictionary(); + Task Consume() + { + var headers = new Dictionary(); - foreach (string h in e.Message.Header.Keys) - { - headers.Add(h, e.Message.Header[h]); - } + foreach (string h in e.Message.Header.Keys) + { + headers.Add(h, e.Message.Header[h]); + } - headers.Add(Headers.Group, _groupName); + headers.Add(Headers.Group, _groupName); - if (_natsOptions.CustomHeadersBuilder != null) + if (_natsOptions.CustomHeadersBuilder != null) + { + var customHeaders = _natsOptions.CustomHeadersBuilder(e, _serviceProvider); + foreach (var customHeader in customHeaders) { - var customHeaders = _natsOptions.CustomHeadersBuilder(e, _serviceProvider); - foreach (var customHeader in customHeaders) - { - headers[customHeader.Key] = customHeader.Value; - } + headers[customHeader.Key] = customHeader.Value; } - - return OnMessageCallback!(new TransportMessage(headers, e.Message.Data), e.Message); } + + return OnMessageCallback!(new TransportMessage(headers, e.Message.Data), e.Message); } + } - public void Commit(object? sender) + public void Commit(object? sender) + { + if (sender is Msg msg) { - if (sender is Msg msg) - { - msg.Ack(); - } - _semaphore.Release(); + msg.Ack(); } + _semaphore.Release(); + } - public void Reject(object? sender) + public void Reject(object? sender) + { + if (sender is Msg msg) { - if (sender is Msg msg) - { - msg.Nak(); - } - _semaphore.Release(); + msg.Nak(); } + _semaphore.Release(); + } + + public void Dispose() + { + _consumerClient?.Dispose(); + } - public void Dispose() + public void Connect() + { + if (_consumerClient != null) { - _consumerClient?.Dispose(); + return; } - public void Connect() + lock (ConnectionLock) { - if (_consumerClient != null) + if (_consumerClient == null) { - return; - } - - lock (ConnectionLock) - { - if (_consumerClient == null) - { - var opts = _natsOptions.Options ?? ConnectionFactory.GetDefaultOptions(); - opts.Url ??= _natsOptions.Servers; - opts.DisconnectedEventHandler = DisconnectedEventHandler; - opts.AsyncErrorEventHandler = AsyncErrorEventHandler; - opts.Timeout = 5000; - opts.AllowReconnect = false; - opts.NoEcho = true; - - _consumerClient = new ConnectionFactory().CreateConnection(opts); - } + var opts = _natsOptions.Options ?? ConnectionFactory.GetDefaultOptions(); + opts.Url ??= _natsOptions.Servers; + opts.DisconnectedEventHandler = DisconnectedEventHandler; + opts.AsyncErrorEventHandler = AsyncErrorEventHandler; + opts.Timeout = 5000; + opts.AllowReconnect = false; + opts.NoEcho = true; + + _consumerClient = new ConnectionFactory().CreateConnection(opts); } } + } - private void DisconnectedEventHandler(object? sender, ConnEventArgs e) - { - if (e.Error is null) return; + private void DisconnectedEventHandler(object? sender, ConnEventArgs e) + { + if (e.Error is null) return; - var logArgs = new LogMessageEventArgs - { - LogType = MqLogType.ConnectError, - Reason = e.Error.ToString() - }; - OnLogCallback!(logArgs); - } + var logArgs = new LogMessageEventArgs + { + LogType = MqLogType.ConnectError, + Reason = e.Error.ToString() + }; + OnLogCallback!(logArgs); + } - private void AsyncErrorEventHandler(object? sender, ErrEventArgs e) + private void AsyncErrorEventHandler(object? sender, ErrEventArgs e) + { + var logArgs = new LogMessageEventArgs { - var logArgs = new LogMessageEventArgs - { - LogType = MqLogType.AsyncErrorEvent, - Reason = e.Error - }; - OnLogCallback!(logArgs); - } + LogType = MqLogType.AsyncErrorEvent, + Reason = e.Error + }; + OnLogCallback!(logArgs); } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.NATS/NATSConsumerClientFactory.cs b/src/DotNetCore.CAP.NATS/NATSConsumerClientFactory.cs index 2e1386664..ced7043e2 100644 --- a/src/DotNetCore.CAP.NATS/NATSConsumerClientFactory.cs +++ b/src/DotNetCore.CAP.NATS/NATSConsumerClientFactory.cs @@ -1,35 +1,34 @@ -// Copyright (c) .NET Core Community. All rights reserved. +// Copyright (c) .NET Core Community. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. using System; using DotNetCore.CAP.Transport; using Microsoft.Extensions.Options; -namespace DotNetCore.CAP.NATS +namespace DotNetCore.CAP.NATS; + +internal sealed class NATSConsumerClientFactory : IConsumerClientFactory { - internal sealed class NATSConsumerClientFactory : IConsumerClientFactory + private readonly IOptions _natsOptions; + private readonly IServiceProvider _serviceProvider; + + public NATSConsumerClientFactory(IOptions natsOptions, IServiceProvider serviceProvider) { - private readonly IOptions _natsOptions; - private readonly IServiceProvider _serviceProvider; + _natsOptions = natsOptions; + _serviceProvider = serviceProvider; + } - public NATSConsumerClientFactory(IOptions natsOptions, IServiceProvider serviceProvider) + public IConsumerClient Create(string groupName, byte groupConcurrent) + { + try { - _natsOptions = natsOptions; - _serviceProvider = serviceProvider; + var client = new NATSConsumerClient(groupName, groupConcurrent, _natsOptions, _serviceProvider); + client.Connect(); + return client; } - - public IConsumerClient Create(string groupName, byte groupConcurrent) + catch (System.Exception e) { - try - { - var client = new NATSConsumerClient(groupName, groupConcurrent, _natsOptions, _serviceProvider); - client.Connect(); - return client; - } - catch (System.Exception e) - { - throw new BrokerConnectionException(e); - } + throw new BrokerConnectionException(e); } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.RabbitMQ/RabbitMQBasicConsumer.cs b/src/DotNetCore.CAP.RabbitMQ/RabbitMQBasicConsumer.cs index 39c74a9a9..66815e16d 100644 --- a/src/DotNetCore.CAP.RabbitMQ/RabbitMQBasicConsumer.cs +++ b/src/DotNetCore.CAP.RabbitMQ/RabbitMQBasicConsumer.cs @@ -1,4 +1,4 @@ -// Copyright (c) .NET Core Community. All rights reserved. +// Copyright (c) .NET Core Community. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. using System; @@ -11,146 +11,145 @@ using RabbitMQ.Client; using RabbitMQ.Client.Events; -namespace DotNetCore.CAP.RabbitMQ +namespace DotNetCore.CAP.RabbitMQ; + +public class RabbitMQBasicConsumer : AsyncDefaultBasicConsumer { - public class RabbitMQBasicConsumer : AsyncDefaultBasicConsumer + private readonly SemaphoreSlim _semaphore; + private readonly string _groupName; + private readonly bool _usingTaskRun; + private readonly Func _msgCallback; + private readonly Action _logCallback; + private readonly Func>>? _customHeadersBuilder; + private readonly IServiceProvider _serviceProvider; + + public RabbitMQBasicConsumer(IModel? model, + byte concurrent, string groupName, + Func msgCallback, + Action logCallback, + Func>>? customHeadersBuilder, + IServiceProvider serviceProvider) + : base(model) { - private readonly SemaphoreSlim _semaphore; - private readonly string _groupName; - private readonly bool _usingTaskRun; - private readonly Func _msgCallback; - private readonly Action _logCallback; - private readonly Func>>? _customHeadersBuilder; - private readonly IServiceProvider _serviceProvider; - - public RabbitMQBasicConsumer(IModel? model, - byte concurrent, string groupName, - Func msgCallback, - Action logCallback, - Func>>? customHeadersBuilder, - IServiceProvider serviceProvider) - : base(model) - { - _semaphore = new SemaphoreSlim(concurrent); - _groupName = groupName; - _usingTaskRun = concurrent > 0; - _msgCallback = msgCallback; - _logCallback = logCallback; - _customHeadersBuilder = customHeadersBuilder; - _serviceProvider = serviceProvider; - } + _semaphore = new SemaphoreSlim(concurrent); + _groupName = groupName; + _usingTaskRun = concurrent > 0; + _msgCallback = msgCallback; + _logCallback = logCallback; + _customHeadersBuilder = customHeadersBuilder; + _serviceProvider = serviceProvider; + } - public override async Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, - string routingKey, IBasicProperties properties, ReadOnlyMemory body) + public override async Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, + string routingKey, IBasicProperties properties, ReadOnlyMemory body) + { + if (_usingTaskRun) { - if (_usingTaskRun) - { - await _semaphore.WaitAsync(); + await _semaphore.WaitAsync(); - _ = Task.Run(Consume).ConfigureAwait(false); - } - else - { - await Consume().ConfigureAwait(false); - } + _ = Task.Run(Consume).ConfigureAwait(false); + } + else + { + await Consume().ConfigureAwait(false); + } - Task Consume() - { - var headers = new Dictionary(); + Task Consume() + { + var headers = new Dictionary(); - if (properties.Headers != null) - foreach (var header in properties.Headers) - { - if (header.Value is byte[] val) - headers.Add(header.Key, Encoding.UTF8.GetString(val)); - else - headers.Add(header.Key, header.Value?.ToString()); - } + if (properties.Headers != null) + foreach (var header in properties.Headers) + { + if (header.Value is byte[] val) + headers.Add(header.Key, Encoding.UTF8.GetString(val)); + else + headers.Add(header.Key, header.Value?.ToString()); + } - headers.Add(Messages.Headers.Group, _groupName); + headers.Add(Messages.Headers.Group, _groupName); - if (_customHeadersBuilder != null) + if (_customHeadersBuilder != null) + { + var e = new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body); + var customHeaders = _customHeadersBuilder(e, _serviceProvider); + foreach (var customHeader in customHeaders) { - var e = new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body); - var customHeaders = _customHeadersBuilder(e, _serviceProvider); - foreach (var customHeader in customHeaders) - { - headers[customHeader.Key] = customHeader.Value; - } + headers[customHeader.Key] = customHeader.Value; } + } - var message = new TransportMessage(headers, body); + var message = new TransportMessage(headers, body); - return _msgCallback(message, deliveryTag); - } + return _msgCallback(message, deliveryTag); } + } - public void BasicAck(ulong deliveryTag) - { - if (Model.IsOpen) - Model.BasicAck(deliveryTag, false); + public void BasicAck(ulong deliveryTag) + { + if (Model.IsOpen) + Model.BasicAck(deliveryTag, false); - _semaphore.Release(); - } + _semaphore.Release(); + } - public void BasicReject(ulong deliveryTag) - { - if (Model.IsOpen) - Model.BasicReject(deliveryTag, true); + public void BasicReject(ulong deliveryTag) + { + if (Model.IsOpen) + Model.BasicReject(deliveryTag, true); - _semaphore.Release(); - } + _semaphore.Release(); + } - public override async Task OnCancel(params string[] consumerTags) + public override async Task OnCancel(params string[] consumerTags) + { + await base.OnCancel(consumerTags); + + var args = new LogMessageEventArgs { - await base.OnCancel(consumerTags); + LogType = MqLogType.ConsumerCancelled, + Reason = string.Join(",", consumerTags) + }; - var args = new LogMessageEventArgs - { - LogType = MqLogType.ConsumerCancelled, - Reason = string.Join(",", consumerTags) - }; + _logCallback(args); + } - _logCallback(args); - } + public override async Task HandleBasicCancelOk(string consumerTag) + { + await base.HandleBasicCancelOk(consumerTag); - public override async Task HandleBasicCancelOk(string consumerTag) + var args = new LogMessageEventArgs { - await base.HandleBasicCancelOk(consumerTag); + LogType = MqLogType.ConsumerUnregistered, + Reason = consumerTag + }; - var args = new LogMessageEventArgs - { - LogType = MqLogType.ConsumerUnregistered, - Reason = consumerTag - }; + _logCallback(args); + } - _logCallback(args); - } + public override async Task HandleBasicConsumeOk(string consumerTag) + { + await base.HandleBasicConsumeOk(consumerTag); - public override async Task HandleBasicConsumeOk(string consumerTag) + var args = new LogMessageEventArgs { - await base.HandleBasicConsumeOk(consumerTag); + LogType = MqLogType.ConsumerRegistered, + Reason = consumerTag + }; - var args = new LogMessageEventArgs - { - LogType = MqLogType.ConsumerRegistered, - Reason = consumerTag - }; + _logCallback(args); + } - _logCallback(args); - } + public override async Task HandleModelShutdown(object model, ShutdownEventArgs reason) + { + await base.HandleModelShutdown(model, reason); - public override async Task HandleModelShutdown(object model, ShutdownEventArgs reason) + var args = new LogMessageEventArgs { - await base.HandleModelShutdown(model, reason); + LogType = MqLogType.ConsumerShutdown, + Reason = reason.ReplyText + }; - var args = new LogMessageEventArgs - { - LogType = MqLogType.ConsumerShutdown, - Reason = reason.ReplyText - }; - - _logCallback(args); - } + _logCallback(args); } } diff --git a/src/DotNetCore.CAP/Diagnostics/EventCounterSource.Cap.cs b/src/DotNetCore.CAP/Diagnostics/EventCounterSource.Cap.cs index 8739b56d0..bd186d054 100644 --- a/src/DotNetCore.CAP/Diagnostics/EventCounterSource.Cap.cs +++ b/src/DotNetCore.CAP/Diagnostics/EventCounterSource.Cap.cs @@ -1,87 +1,86 @@ -// Copyright (c) .NET Core Community. All rights reserved. +// Copyright (c) .NET Core Community. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. using System; using System.Diagnostics.Tracing; -namespace DotNetCore.CAP.Diagnostics +namespace DotNetCore.CAP.Diagnostics; + +[EventSource(Name = CapDiagnosticListenerNames.MetricListenerName)] +public class CapEventCounterSource : EventSource { - [EventSource(Name = CapDiagnosticListenerNames.MetricListenerName)] - public class CapEventCounterSource : EventSource - { - public static readonly CapEventCounterSource Log = new(); + public static readonly CapEventCounterSource Log = new(); - private IncrementingEventCounter? _publishPerSecondCounter; - private IncrementingEventCounter? _consumePerSecondCounter; - private IncrementingEventCounter? _subscriberInvokePerSecondCounter; + private IncrementingEventCounter? _publishPerSecondCounter; + private IncrementingEventCounter? _consumePerSecondCounter; + private IncrementingEventCounter? _subscriberInvokePerSecondCounter; - private EventCounter? _invokeCounter; + private EventCounter? _invokeCounter; - private CapEventCounterSource() { } + private CapEventCounterSource() { } - protected override void OnEventCommand(EventCommandEventArgs args) + protected override void OnEventCommand(EventCommandEventArgs args) + { + if (args.Command == EventCommand.Enable) { - if (args.Command == EventCommand.Enable) + _publishPerSecondCounter ??= new IncrementingEventCounter(CapDiagnosticListenerNames.PublishedPerSec, this) { - _publishPerSecondCounter ??= new IncrementingEventCounter(CapDiagnosticListenerNames.PublishedPerSec, this) - { - DisplayName = "Publish Rate", - DisplayRateTimeScale = TimeSpan.FromSeconds(1) - }; - - _consumePerSecondCounter ??= new IncrementingEventCounter(CapDiagnosticListenerNames.ConsumePerSec, this) - { - DisplayName = "Consume Rate", - DisplayRateTimeScale = TimeSpan.FromSeconds(1) - }; - - _subscriberInvokePerSecondCounter ??= new IncrementingEventCounter(CapDiagnosticListenerNames.InvokeSubscriberPerSec, this) - { - DisplayName = "Invoke Subscriber Rate", - DisplayRateTimeScale = TimeSpan.FromSeconds(1) - }; - - _invokeCounter ??= new EventCounter(CapDiagnosticListenerNames.InvokeSubscriberElapsedMs, this) - { - DisplayName = "Invoke Subscriber Elapsed Time", - DisplayUnits = "ms" - }; - } - } + DisplayName = "Publish Rate", + DisplayRateTimeScale = TimeSpan.FromSeconds(1) + }; - public void WritePublishMetrics() - { - _publishPerSecondCounter?.Increment(); - } + _consumePerSecondCounter ??= new IncrementingEventCounter(CapDiagnosticListenerNames.ConsumePerSec, this) + { + DisplayName = "Consume Rate", + DisplayRateTimeScale = TimeSpan.FromSeconds(1) + }; - public void WriteConsumeMetrics() - { - _consumePerSecondCounter?.Increment(); - } + _subscriberInvokePerSecondCounter ??= new IncrementingEventCounter(CapDiagnosticListenerNames.InvokeSubscriberPerSec, this) + { + DisplayName = "Invoke Subscriber Rate", + DisplayRateTimeScale = TimeSpan.FromSeconds(1) + }; - public void WriteInvokeMetrics() - { - _subscriberInvokePerSecondCounter?.Increment(); + _invokeCounter ??= new EventCounter(CapDiagnosticListenerNames.InvokeSubscriberElapsedMs, this) + { + DisplayName = "Invoke Subscriber Elapsed Time", + DisplayUnits = "ms" + }; } + } - public void WriteInvokeTimeMetrics(double elapsedMs) - { - _invokeCounter?.WriteMetric(elapsedMs); - } + public void WritePublishMetrics() + { + _publishPerSecondCounter?.Increment(); + } - protected override void Dispose(bool disposing) - { - _publishPerSecondCounter?.Dispose(); - _consumePerSecondCounter?.Dispose(); - _subscriberInvokePerSecondCounter?.Dispose(); - _invokeCounter?.Dispose(); + public void WriteConsumeMetrics() + { + _consumePerSecondCounter?.Increment(); + } + + public void WriteInvokeMetrics() + { + _subscriberInvokePerSecondCounter?.Increment(); + } - _publishPerSecondCounter = null; - _consumePerSecondCounter = null; - _subscriberInvokePerSecondCounter = null; - _invokeCounter = null; + public void WriteInvokeTimeMetrics(double elapsedMs) + { + _invokeCounter?.WriteMetric(elapsedMs); + } - base.Dispose(disposing); - } + protected override void Dispose(bool disposing) + { + _publishPerSecondCounter?.Dispose(); + _consumePerSecondCounter?.Dispose(); + _subscriberInvokePerSecondCounter?.Dispose(); + _invokeCounter?.Dispose(); + + _publishPerSecondCounter = null; + _consumePerSecondCounter = null; + _subscriberInvokePerSecondCounter = null; + _invokeCounter = null; + + base.Dispose(disposing); } } diff --git a/src/DotNetCore.CAP/Internal/ISnowflakeId.cs b/src/DotNetCore.CAP/Internal/ISnowflakeId.cs index 5fc62e001..5c9cd9707 100644 --- a/src/DotNetCore.CAP/Internal/ISnowflakeId.cs +++ b/src/DotNetCore.CAP/Internal/ISnowflakeId.cs @@ -1,10 +1,9 @@ -// Copyright 2010-2012 Twitter, Inc. +// Copyright 2010-2012 Twitter, Inc. // An object that generates IDs. This is broken into a separate class in case we ever want to support multiple worker threads per process -namespace DotNetCore.CAP.Internal +namespace DotNetCore.CAP.Internal; + +public interface ISnowflakeId { - public interface ISnowflakeId - { - long NextId(); - } + long NextId(); } \ No newline at end of file