Skip to content

Commit

Permalink
Adding default correlation headers to configure additional message co…
Browse files Browse the repository at this point in the history
…rrelation for Azure Service Bus. (#1497)

* Adding option to disable automation message correlation

* Added correlation properties collection
  • Loading branch information
demorgi authored Mar 6, 2024
1 parent 2194ef5 commit dac69cf
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 13 deletions.
23 changes: 12 additions & 11 deletions docs/content/user-guide/en/transport/azure-service-bus.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,18 @@ public void ConfigureServices(IServiceCollection services)

The AzureServiceBus configuration options provided directly by the CAP:

| NAME | DESCRIPTION | TYPE | DEFAULT |
| :---------------------- | :------------------------------------------------------------------------------------------------------------------------------------------ | ---------------------------------------------------- | :---------------- |
| ConnectionString | Endpoint address | string | |
| EnableSessions | Enable [Service bus sessions](https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-sessions) | bool | false |
| TopicPath | Topic entity path | string | cap |
| SubscriptionAutoDeleteOnIdle | Automatically delete subscription after a certain idle interval. | TimeSpan | TimeSpan.MaxValue |
| ManagementTokenProvider | Token provider | ITokenProvider | null |
| AutoCompleteMessages | Gets a value that indicates whether the processor should automatically complete messages after the message handler has completed processing | bool | false |
| CustomHeaders | Adds custom and/or mandatory Headers for incoming messages from heterogeneous systems. | `Func<Message, List<KeyValuePair<string, string>>>?` | null |
| Namespace | Namespace of Servicebus , Needs to be set when using with TokenCredential Property | string | null |
| SQLFilters | Custom SQL Filters by name and expression on Topic Subscribtion | List<KeyValuePair<string, string>> | null |
| NAME | DESCRIPTION | TYPE | DEFAULT |
|:------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------|:----------------------------------|
| ConnectionString | Endpoint address | string | |
| EnableSessions | Enable [Service bus sessions](https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-sessions) | bool | false |
| TopicPath | Topic entity path | string | cap |
| SubscriptionAutoDeleteOnIdle | Automatically delete subscription after a certain idle interval. | TimeSpan | TimeSpan.MaxValue |
| ManagementTokenProvider | Token provider | ITokenProvider | null |
| AutoCompleteMessages | Gets a value that indicates whether the processor should automatically complete messages after the message handler has completed processing | bool | false |
| CustomHeaders | Adds custom and/or mandatory Headers for incoming messages from heterogeneous systems. | `Func<Message, List<KeyValuePair<string, string>>>?` | null |
| Namespace | Namespace of Servicebus , Needs to be set when using with TokenCredential Property | string | null |
| DefaultCorrelationHeaders | Adds additional correlation properties to all [correlation filters](https://learn.microsoft.com/en-us/azure/service-bus-messaging/topic-filters#correlation-filters). | IDictionary<string, string> | Dictionary<string, string>.Empty |
| SQLFilters | Custom SQL Filters by name and expression on Topic Subscribtion | List<KeyValuePair<string, string>> | null |

#### Sessions

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ public void Subscribe(IEnumerable<string> topics)
ConnectAsync().GetAwaiter().GetResult();

topics = topics.Concat(_asbOptions!.SQLFilters?.Select(o => o.Key) ?? Enumerable.Empty<string>());

var allRules = _administrationClient!.GetRulesAsync(_asbOptions!.TopicPath, _subscriptionName).ToEnumerable();
var allRuleNames = allRules.Select(o => o.Name);


foreach (var newRule in topics.Except(allRuleNames))
{
var isSqlRule = _asbOptions.SQLFilters?.FirstOrDefault(o => o.Key == newRule).Value is not null;
Expand All @@ -71,10 +71,17 @@ public void Subscribe(IEnumerable<string> topics)
}
else
{
currentRuleToAdd = new CorrelationRuleFilter
var correlationRule = new CorrelationRuleFilter
{
Subject = newRule
};

foreach (var correlationHeader in _asbOptions.DefaultCorrelationHeaders)
{
correlationRule.ApplicationProperties.Add(correlationHeader.Key, correlationHeader.Value);
}

currentRuleToAdd = correlationRule;
}

_administrationClient.CreateRuleAsync(_asbOptions.TopicPath, _subscriptionName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ public class AzureServiceBusOptions
/// </summary>
public bool AutoCompleteMessages { get; set; }

/// <summary>
/// Adds additional correlation properties to all correlation filters.
/// https://learn.microsoft.com/en-us/azure/service-bus-messaging/topic-filters#correlation-filters
/// </summary>
public IDictionary<string, string> DefaultCorrelationHeaders { get; } = new Dictionary<string, string>();

/// <summary>
/// Gets the maximum number of concurrent calls to the ProcessMessageAsync message handler the processor should initiate.
/// </summary>
Expand Down

0 comments on commit dac69cf

Please sign in to comment.