Skip to content

Commit

Permalink
Refactor to file-scoped namespaces (#1586)
Browse files Browse the repository at this point in the history
Modified : src/DotNetCore.CAP.NATS/CAP.NATSCapOptionsExtension.cs
Modified : src/DotNetCore.CAP.NATS/CAP.NATSOptions.cs
Modified : src/DotNetCore.CAP.NATS/CAP.Options.Extensions.cs
Modified : src/DotNetCore.CAP.NATS/IConnectionPool.Default.cs
Modified : src/DotNetCore.CAP.NATS/IConnectionPool.cs
Modified : src/DotNetCore.CAP.NATS/ITransport.NATS.cs
Modified : src/DotNetCore.CAP.NATS/NATSConsumerClient.cs
Modified : src/DotNetCore.CAP.NATS/NATSConsumerClientFactory.cs
Modified : src/DotNetCore.CAP.RabbitMQ/RabbitMQBasicConsumer.cs
Modified : src/DotNetCore.CAP/Diagnostics/EventCounterSource.Cap.cs
Modified : src/DotNetCore.CAP/Internal/ISnowflakeId.cs
  • Loading branch information
samanazadi1996 authored Sep 20, 2024
1 parent 8397a5a commit 1cadd3a
Show file tree
Hide file tree
Showing 11 changed files with 546 additions and 557 deletions.
33 changes: 16 additions & 17 deletions src/DotNetCore.CAP.NATS/CAP.NATSCapOptionsExtension.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
Expand All @@ -7,26 +7,25 @@
using Microsoft.Extensions.DependencyInjection;

// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
namespace DotNetCore.CAP;

internal sealed class NATSCapOptionsExtension : ICapOptionsExtension
{
internal sealed class NATSCapOptionsExtension : ICapOptionsExtension
{
private readonly Action<NATSOptions> _configure;
private readonly Action<NATSOptions> _configure;

public NATSCapOptionsExtension(Action<NATSOptions> configure)
{
_configure = configure;
}
public NATSCapOptionsExtension(Action<NATSOptions> configure)
{
_configure = configure;
}

public void AddServices(IServiceCollection services)
{
services.AddSingleton(new CapMessageQueueMakerService("NATS JetStream"));
public void AddServices(IServiceCollection services)
{
services.AddSingleton(new CapMessageQueueMakerService("NATS JetStream"));

services.Configure(_configure);
services.Configure(_configure);

services.AddSingleton<ITransport, NATSTransport>();
services.AddSingleton<IConsumerClientFactory, NATSConsumerClientFactory>();
services.AddSingleton<IConnectionPool, ConnectionPool>();
}
services.AddSingleton<ITransport, NATSTransport>();
services.AddSingleton<IConsumerClientFactory, NATSConsumerClientFactory>();
services.AddSingleton<IConnectionPool, ConnectionPool>();
}
}
73 changes: 36 additions & 37 deletions src/DotNetCore.CAP.NATS/CAP.NATSOptions.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
Expand All @@ -7,43 +7,42 @@
using NATS.Client.JetStream;

// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
namespace DotNetCore.CAP;

/// <summary>
/// Provides programmatic configuration for the CAP NATS project.
/// </summary>
public class NATSOptions
{
/// <summary>
/// Provides programmatic configuration for the CAP NATS project.
/// Gets or sets the server url/urls used to connect to the NATs server.
/// </summary>
/// <remarks>This may contain username/password information.</remarks>
public string Servers { get; set; } = "nats://127.0.0.1:4222";

/// <summary>
/// connection pool size, default is 10
/// </summary>
public int ConnectionPoolSize { get; set; } = 10;

/// <summary>
/// Allows a nats consumer client to dynamically create a stream and configure the expected subjects on the stream. Defaults to true.
/// </summary>
public bool EnableSubscriberClientStreamAndSubjectCreation { get; set; } = true;

/// <summary>
/// Used to setup all NATs client options
/// </summary>
public Options? Options { get; set; }

public Action<StreamConfiguration.StreamConfigurationBuilder>? StreamOptions { get; set; }

public Action<ConsumerConfiguration.ConsumerConfigurationBuilder>? ConsumerOptions { get; set; }

/// <summary>
/// If you need to get additional native delivery args, you can use this function to write into <see cref="CapHeader" />.
/// </summary>
public class NATSOptions
{
/// <summary>
/// Gets or sets the server url/urls used to connect to the NATs server.
/// </summary>
/// <remarks>This may contain username/password information.</remarks>
public string Servers { get; set; } = "nats://127.0.0.1:4222";

/// <summary>
/// connection pool size, default is 10
/// </summary>
public int ConnectionPoolSize { get; set; } = 10;

/// <summary>
/// Allows a nats consumer client to dynamically create a stream and configure the expected subjects on the stream. Defaults to true.
/// </summary>
public bool EnableSubscriberClientStreamAndSubjectCreation { get; set; } = true;

/// <summary>
/// Used to setup all NATs client options
/// </summary>
public Options? Options { get; set; }

public Action<StreamConfiguration.StreamConfigurationBuilder>? StreamOptions { get; set; }

public Action<ConsumerConfiguration.ConsumerConfigurationBuilder>? ConsumerOptions { get; set; }

/// <summary>
/// If you need to get additional native delivery args, you can use this function to write into <see cref="CapHeader" />.
/// </summary>
public Func<MsgHandlerEventArgs, IServiceProvider, List<KeyValuePair<string, string>>>? CustomHeadersBuilder { get; set; }

public Func<string, string> NormalizeStreamName { get; set; } = origin => origin.Split('.')[0];
}
public Func<MsgHandlerEventArgs, IServiceProvider, List<KeyValuePair<string, string>>>? CustomHeadersBuilder { get; set; }

public Func<string, string> NormalizeStreamName { get; set; } = origin => origin.Split('.')[0];
}
57 changes: 28 additions & 29 deletions src/DotNetCore.CAP.NATS/CAP.Options.Extensions.cs
Original file line number Diff line number Diff line change
@@ -1,44 +1,43 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using DotNetCore.CAP;

// ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.DependencyInjection
namespace Microsoft.Extensions.DependencyInjection;

public static class CapOptionsExtensions
{
public static class CapOptionsExtensions
/// <summary>
/// Configuration to use NATS in CAP.
/// </summary>
/// <param name="options">CAP configuration options</param>
/// <param name="bootstrapServers">NATS bootstrap server urls.</param>
public static CapOptions UseNATS(this CapOptions options, string? bootstrapServers = null)
{
/// <summary>
/// Configuration to use NATS in CAP.
/// </summary>
/// <param name="options">CAP configuration options</param>
/// <param name="bootstrapServers">NATS bootstrap server urls.</param>
public static CapOptions UseNATS(this CapOptions options, string? bootstrapServers = null)
return options.UseNATS(opt =>
{
return options.UseNATS(opt =>
{
if (bootstrapServers != null)
opt.Servers = bootstrapServers;
});
}
if (bootstrapServers != null)
opt.Servers = bootstrapServers;
});
}

/// <summary>
/// Configuration to use NATS in CAP.
/// </summary>
/// <param name="options">CAP configuration options</param>
/// <param name="configure">Provides programmatic configuration for the NATS.</param>
/// <returns></returns>
public static CapOptions UseNATS(this CapOptions options, Action<NATSOptions> configure)
/// <summary>
/// Configuration to use NATS in CAP.
/// </summary>
/// <param name="options">CAP configuration options</param>
/// <param name="configure">Provides programmatic configuration for the NATS.</param>
/// <returns></returns>
public static CapOptions UseNATS(this CapOptions options, Action<NATSOptions> configure)
{
if (configure == null)
{
if (configure == null)
{
throw new ArgumentNullException(nameof(configure));
}
throw new ArgumentNullException(nameof(configure));
}

options.RegisterExtension(new NATSCapOptionsExtension(configure));
options.RegisterExtension(new NATSCapOptionsExtension(configure));

return options;
}
return options;
}
}
107 changes: 53 additions & 54 deletions src/DotNetCore.CAP.NATS/IConnectionPool.Default.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
Expand All @@ -8,78 +8,77 @@
using Microsoft.Extensions.Options;
using NATS.Client;

namespace DotNetCore.CAP.NATS
namespace DotNetCore.CAP.NATS;

public class ConnectionPool : IConnectionPool, IDisposable
{
public class ConnectionPool : IConnectionPool, IDisposable
{
private readonly NATSOptions _options;
private readonly ConcurrentQueue<IConnection> _connectionPool;
private readonly NATSOptions _options;
private readonly ConcurrentQueue<IConnection> _connectionPool;

private readonly ConnectionFactory _connectionFactory;
private int _pCount;
private int _maxSize;
private readonly ConnectionFactory _connectionFactory;
private int _pCount;
private int _maxSize;

public ConnectionPool(ILogger<ConnectionPool> logger, IOptions<NATSOptions> options)
{
_options = options.Value;
_connectionPool = new ConcurrentQueue<IConnection>();
_connectionFactory = new ConnectionFactory();
_maxSize = _options.ConnectionPoolSize;
public ConnectionPool(ILogger<ConnectionPool> logger, IOptions<NATSOptions> options)
{
_options = options.Value;
_connectionPool = new ConcurrentQueue<IConnection>();
_connectionFactory = new ConnectionFactory();
_maxSize = _options.ConnectionPoolSize;

logger.LogDebug("NATS configuration: {0}", options.Value.Options);
}
logger.LogDebug("NATS configuration: {0}", options.Value.Options);
}

public string ServersAddress => _options.Servers;
public string ServersAddress => _options.Servers;

public IConnection RentConnection()
public IConnection RentConnection()
{
if (_connectionPool.TryDequeue(out var connection))
{
if (_connectionPool.TryDequeue(out var connection))
{
Interlocked.Decrement(ref _pCount);

return connection;
}

if (_options.Options != null)
{
_options.Options.Url = _options.Servers;
connection = _connectionFactory.CreateConnection(_options.Options);
}
else
{
connection = _connectionFactory.CreateConnection(_options.Servers);
}
Interlocked.Decrement(ref _pCount);

return connection;
}

public bool Return(IConnection connection)
if (_options.Options != null)
{
if (Interlocked.Increment(ref _pCount) <= _maxSize && connection.State == ConnState.CONNECTED)
{
_connectionPool.Enqueue(connection);

return true;
}
_options.Options.Url = _options.Servers;
connection = _connectionFactory.CreateConnection(_options.Options);
}
else
{
connection = _connectionFactory.CreateConnection(_options.Servers);
}

if (!connection.IsReconnecting())
{
connection.Dispose();
}
return connection;
}

Interlocked.Decrement(ref _pCount);
public bool Return(IConnection connection)
{
if (Interlocked.Increment(ref _pCount) <= _maxSize && connection.State == ConnState.CONNECTED)
{
_connectionPool.Enqueue(connection);

return false;
return true;
}

public void Dispose()
if (!connection.IsReconnecting())
{
_maxSize = 0;
connection.Dispose();
}

Interlocked.Decrement(ref _pCount);

while (_connectionPool.TryDequeue(out var context))
{
context.Dispose();
}
return false;
}

public void Dispose()
{
_maxSize = 0;

while (_connectionPool.TryDequeue(out var context))
{
context.Dispose();
}
}
}
15 changes: 7 additions & 8 deletions src/DotNetCore.CAP.NATS/IConnectionPool.cs
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using NATS.Client;

namespace DotNetCore.CAP.NATS
namespace DotNetCore.CAP.NATS;

public interface IConnectionPool
{
public interface IConnectionPool
{
string ServersAddress { get; }
string ServersAddress { get; }

IConnection RentConnection();
IConnection RentConnection();

bool Return(IConnection connection);
}
bool Return(IConnection connection);
}
Loading

0 comments on commit 1cadd3a

Please sign in to comment.