Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Azure Service Bus Custom Consumer #1571

Open
KuchaD opened this issue Aug 9, 2024 · 13 comments
Open

Azure Service Bus Custom Consumer #1571

KuchaD opened this issue Aug 9, 2024 · 13 comments

Comments

@KuchaD
Copy link

KuchaD commented Aug 9, 2024

Current Situation:
Currently, our Azure Service Bus provider lacks support for multiple namespaces and topics for consumers. We are utilizing a single topic within Azure Service Bus for messaging, and a single namespace for the application.

Solution Attempt:
I have attempted to implement a solution that addresses this limitation, and it works as expected. However, I am currently unable to submit a pull request (PR) for this update.

Register in program files.

builder.Services.AddCap(c =>
{
    c.Version = "v1";
    c.UseInMemoryStorage();
    c.UseAzureServiceBus(asb =>
    {
        asb.ConnectionString =
            "Endpoint=xxx";
        asb.CustomHeadersBuilder = (message, serviceProvider) =>
        {
            var snowFlakeId = serviceProvider.GetRequiredService<ISnowflakeId>();

            return new List<KeyValuePair<string, string>>()
                        {

                            new(DotNetCore.CAP.Messages.Headers.MessageId,
                                snowFlakeId.NextId().ToString()),
                            new(DotNetCore.CAP.Messages.Headers.MessageName, message.Subject),
                            new("IsFromSampleProject", "'true'")
                        };
        };
        asb.SQLFilters = new List<KeyValuePair<string, string>>() {
            new("IsFromSampleProjectFilter","IsFromSampleProject = 'true'")
        };
        
        
        asb.ConfigureCustomProducer<EntityCreatedForIntegration>(cfg => cfg.UseTopic("entity-created").WithSubscription());
        asb.ConfigureCustomProducer<EntityDeletedForIntegration>(cfg => cfg.UseTopic("entity-deleted").WithSubscription());
        asb.ConfigureCustomConsumer(cfg =>
        {
            cfg.UseGroupName($"test.{c.Version}");
            cfg.UseTopic("entity-created");
        });
    });

    c.UseDashboard();
});

Consumer

    [CapSubscribe(nameof(EntityCreatedForIntegration), Group = "test")]
    public void Handle(EntityCreatedForIntegration message)
    {
        Console.WriteLine($"Message {message.Id} received");
    }

CustomConsumer

public class ServiceBusConsumerDescriptorBuilder
{
    private string GroupName { get; set; } = null!;
    private string? Namespace { get; set; } = null;
    private string TopicPath { get; set; } = null!;
    
    
    public ServiceBusConsumerDescriptorBuilder UseGroupName(string groupName)
    {
        GroupName = groupName;
        return this;
    }

    public ServiceBusConsumerDescriptorBuilder UseTopic(string topicPath)
    {
        TopicPath = topicPath;
        return this;
    }
    
    public ServiceBusConsumerDescriptorBuilder UseNamespace(string @namespace)
    {
        Namespace = @namespace;
        return this;
    }
    

    public KeyValuePair<string, IServiceBusConsumerDescriptor> Build()
    {
        return new KeyValuePair<string, IServiceBusConsumerDescriptor> (GroupName, new ServiceBusConsumerDescriptor(TopicPath, Namespace));
    }
}

public interface IServiceBusConsumerDescriptor
{
    string TopicPath { get; }
    string? Namespace { get; }
}

public class ServiceBusConsumerDescriptor : IServiceBusConsumerDescriptor
{
    public ServiceBusConsumerDescriptor(string topicPath, string? @namespace)
    {
        TopicPath = topicPath;
        Namespace = @namespace;
    }

    public string TopicPath { get; }
    public string? Namespace { get; }
}

Client factory

internal sealed class AzureServiceBusConsumerClientFactory : IConsumerClientFactory
{
    private readonly IOptions<AzureServiceBusOptions> _asbOptions;
    private readonly ILoggerFactory _loggerFactory;
    private readonly IServiceProvider _serviceProvider;

    public AzureServiceBusConsumerClientFactory(
        ILoggerFactory loggerFactory,
        IOptions<AzureServiceBusOptions> asbOptions,
        IServiceProvider serviceProvider)
    {
        _loggerFactory = loggerFactory;
        _asbOptions = asbOptions;
        _serviceProvider = serviceProvider;
    }

    public IConsumerClient Create(string groupName, byte groupConcurrent)
    {
        try
        {
            var logger = _loggerFactory.CreateLogger(typeof(AzureServiceBusConsumerClient));
            if (_asbOptions.Value.CustomConsumers.TryGetValue(groupName, out var customConsumer))
            {
                var customClient = new AzureServiceBusConsumerClient(logger, groupName, groupConcurrent, _asbOptions, _serviceProvider, customConsumer);
                customClient.ConnectAsync().GetAwaiter().GetResult();
                return customClient;
            }
            
            var client = new AzureServiceBusConsumerClient(logger, groupName, groupConcurrent, _asbOptions, _serviceProvider);
            client.ConnectAsync().GetAwaiter().GetResult();
            return client;
        }
        catch (Exception e)
        {
            throw new BrokerConnectionException(e);
        }
    }
}

Ctor

    public AzureServiceBusConsumerClient(
        ILogger logger,
        string subscriptionName,
        byte groupConcurrent,
        IOptions<AzureServiceBusOptions> options,
        IServiceProvider serviceProvider,
        IServiceBusConsumerDescriptor? consumerDescriptor = null
        )
    {
        _logger = logger;
        _subscriptionName = subscriptionName;
        _groupConcurrent = groupConcurrent;
        _semaphore = new SemaphoreSlim(groupConcurrent);
        _serviceProvider = serviceProvider;
        _asbOptions = options.Value ?? throw new ArgumentNullException(nameof(options));
      
        if (consumerDescriptor is not null)
        {
            _asbOptions.TopicPath = consumerDescriptor.TopicPath;
            _asbOptions.Namespace = consumerDescriptor.Namespace ?? _asbOptions.Namespace;
        }
        
        CheckValidSubscriptionName(subscriptionName);
    }
@Admi99
Copy link

Admi99 commented Aug 17, 2024

@yang-xiaodong
This would be really usefull for us too.
Would it be problem to add it ?

@yang-xiaodong
Copy link
Member

@mviegas What do you think about this feature?

@mviegas
Copy link
Contributor

mviegas commented Aug 19, 2024

It makes sense to me. I always envisioned such a feature. It provides more flexibility in the pub/sub configuration in similar ways to other libraries, such as MassTransit.

A couple of things that come to my mind are:

  • We should ideally avoid pitfalls such as feature differences between default and custom consumers. Example: SQL Filters and Sessions. Those features are currently already supported by the default consumer configuration, and should also continue to be supported for multiple consumer configurations. That would be a suggestion I would make to enhance feat: asb custom consumers #1572.
  • For cohesion, the API for custom consumers could follow a similar approach to the custom producers feature introduced in Allow publishing to multiple Service Bus topics (attempt 2) #1283.

@KuchaD
Copy link
Author

KuchaD commented Aug 19, 2024

@mviegas Thanks for the quick response.

  • First Step: I've added support for a base feature to the custom consumer. Now, you can add your own settings per consumer or inherit from the base settings, depending on your preference. This update has already been added to the documentation.

  • Second Step: The next step is a bit trickier because I’m handling this at the group level rather than at the object type level. I still want to allow subscriptions to two types within one consumer group. I’m fixed a better configuration for this and will updated the documentation and example programs accordingly.

c.UseAzureServiceBus(asb =>
{
    asb.ConnectionString = ...
           asb.ConfigureCustomGroupConsumer("test", cfg =>
        {
            cfg.UseTopic("entity-created");
            cfg.UseConnectionString("external connection string");
            cfg.UseDefaultOptions(); // Use default options from default consumer
        });
        
        asb.ConfigureCustomGroupConsumer("test2", cfg =>
        {
            cfg.UseTopic("entity-deleted");
            //Set custom options for this consumer
            cfg.Configuration(c =>
            {
                c.EnableSessions = true;
            });
        });
});

I hope that is what did you expected

@mviegas
Copy link
Contributor

mviegas commented Aug 19, 2024

Thanks for your prompt reply @KuchaD. I think I might have misread what your suggestion was. So we're not going deep on a type level, but rather on a topic (group) level. I have to think a bit more just to make sure we deliver a coherent and long-lived API for this, if you could give me a couple of days to reflect on it before a final decision it'd be great.

Meanwhile, I will start the code review in the PR for what's currently there, it's a great kickstart.

@mviegas
Copy link
Contributor

mviegas commented Aug 19, 2024

Another question that comes to my mind: are you proposing to support multiple topics within a single namespace? Or multiple topics in multiple namespaces? If the latter is the answer, what is the motivation behind it?

@KuchaD
Copy link
Author

KuchaD commented Aug 20, 2024

@mviegas I already processed review comments.

We have realy lots of microservice and where microservisea has topic for single type message.
motivation is that we i suppost (I didnt in star of application ) is we see in portal who subscibe whitch type of message without we need to check every subcriber filter and when somebody didnt process messages correcly we know were is problem in first look.

Example:

Service A with namespace NA
Producete Message MA1 to topic A-namaspace
Producete Message MA2 to topic A-namaspace
Consumer
Message MC1 from NC

Service B with namespace NB
Producete Message MB1 to topic B1
Producete Message MB2 to topic B2
Consumer
Message MA1 from NA
Message MC1 from NC

Service C with namespace NC
Producete Message MC1 to topic C1
Producete Message MC2 to topic C2
Consumer
Message MB2 from NB

@mviegas
Copy link
Contributor

mviegas commented Aug 20, 2024

Hi @KuchaD! Thanks for your feedback.

After some more thorough thoughts, I have no issues with the approach of producing/consuming multiple topics within the same namespace. However supporting multiple namespaces would mean supporting multiple broker addresses, which is something that goes against current CAP contracts and would also introduce a lot of background work not only in the transport layer but also in the framework itself to ensure proper connection management (multiple topologies, resiliency, etc.). cc @yang-xiaodong

That way, I think that we can move forward with the approach of adding custom topic consumers for the same namespace, but not for multiple namespaces. In practice, this would mean that every ConsumerDescriptor as proposed in #1572 should only have Entity-related properties (like EnableSessions, message TTL), not Namespace-related properties (like Connection String/Credentials).

@KuchaD
Copy link
Author

KuchaD commented Aug 20, 2024

@mviegas Hi I understand your problems because you use BrokerAdress in ConsumerRegister. But if you use TokenCredential dont you need BrokerAdress ? What do you think when we could supported multiple namespace if we use TokenCredential ? I just quick checked imlementation. Maybe i wrong.

@mviegas
Copy link
Contributor

mviegas commented Aug 20, 2024

@KuchaD you're right that there's a bug in the BrokerAddress setter for the ASB transport. It should be settled as either the ConnectionString or the Namespace when the authentication is done via TokenCredential. Nowadays it doesn't consider the latter.

That said, the rule for connecting to a single namespace is still valid. We should keep it that way to keep the library standards with other transports as well.

@KuchaD
Copy link
Author

KuchaD commented Aug 24, 2024

@mviegas thx for explained. Do you think that in future you will support multiple azure service bus or (namespacing) as Mass Transit. For company were i am working is it really necessary for using cap in our project. Now we have one project with cap and we really satify with them but this is big blocker for us for migrate from mass transint to cap

@KuchaD
Copy link
Author

KuchaD commented Aug 24, 2024

I am open to contribute with this situation if you are open to help me make solution and figure out how to implement this to cap.

For start i fix solution for one namespace :)

thx

@mviegas
Copy link
Contributor

mviegas commented Aug 25, 2024

@KuchaD, yes I'm aware that MassTransit allows you to configure multiple buses with some adjustments using .NET DI, where each bus has its own configuration, including the broker address/namespace. In CAP, configuring multiple buses isn't currently possible. If this changes in the future, we can adapt the ASB transport to support it. But it should be something coming from the framework first.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants