-
Notifications
You must be signed in to change notification settings - Fork 0
Spike function manifests #17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
cc5644c
d8b8b1f
06441cc
91d6e01
06b5ae0
d1a1616
f9c4137
0ca7748
cd1fe8e
de0ed25
a04e6ee
2bd1d46
4587313
f3252bb
0954be7
4ce8eb5
315bbd7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,22 @@ | ||
| namespace NServiceBus; | ||
|
|
||
| using Microsoft.Azure.Functions.Worker.Builder; | ||
|
|
||
| public static class NServiceBusEndpoints | ||
| { | ||
| public static FunctionManifest ReceiverEndpoint = new("ReceiverEndpoint", "ReceiverEndpoint", "AzureWebJobsServiceBus"); | ||
| public static FunctionManifest AnotherReceiverEndpoint = new("AnotherReceiverEndpoint", "AnotherReceiverEndpoint", "AzureWebJobsServiceBus"); | ||
| } | ||
|
|
||
|
|
||
| public record AnotherReceiverEndpoint2() : FunctionManifest("AnotherReceiverEndpoint2", "AnotherReceiverEndpoint2", "AzureWebJobsServiceBus"); | ||
|
|
||
| public record AnotherReceiverEndpoint3() : FunctionManifest("AnotherReceiverEndpoint3", "AnotherReceiverEndpoint3", "AzureWebJobsServiceBus"); | ||
|
|
||
| public static class FunctionsHostApplicationBuilderExtensions | ||
| { | ||
| public static void AddAnotherEndpoint3NServiceBusFunction( | ||
| this FunctionsApplicationBuilder builder, | ||
| Action<EndpointConfiguration> configure) => | ||
| builder.AddNServiceBusFunction<AnotherReceiverEndpoint3>(configure); | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,16 +12,36 @@ | |
|
|
||
| builder.Services.AddHostedService<InitializeLogger>(); | ||
|
|
||
| builder.AddNServiceBusFunction("SenderEndpoint", endpoint => | ||
| // Send-only using a separate API since they are not "functions" | ||
| builder.AddSendOnlyNServiceBusEndpoint("SenderEndpoint", endpoint => | ||
| { | ||
| endpoint.UseTransport(new AzureServiceBusServerlessTransport(TopicTopology.Default)); | ||
| endpoint.SendOnly(); | ||
| var transport = new AzureServiceBusServerlessTransport(TopicTopology.Default) | ||
| { | ||
| //send only endpoints might need to set the connection name | ||
| ConnectionName = "AzureWebJobsServiceBus" | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is the use case where users might want to control the connection name, if so we might want to add this option to |
||
| }; | ||
|
|
||
| var routing = endpoint.UseTransport(transport); | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Demo use of routing |
||
|
|
||
| routing.RouteToEndpoint(typeof(TriggerMessage), "ReceiverEndpoint"); | ||
| endpoint.UseSerialization<SystemJsonSerializer>(); | ||
| }); | ||
|
|
||
|
|
||
| //NOTE: forgetting to register a function leads to "Exception: Unable to resolve service for type 'NServiceBus.AzureFunctions.AzureServiceBus.IMessageProcessor' while attempting to activate 'IntegrationTest.AnotherReceiverEndpoint3Function'." | ||
|
|
||
| //option 1: No source gen on the configuration side, user needs to use correct name, queue and connection name as the function definition | ||
| builder.AddNServiceBusFunction("ReceiverEndpoint", endpoint => | ||
| { | ||
| endpoint.UseTransport(new AzureServiceBusServerlessTransport(TopicTopology.Default)); | ||
| var transport = new AzureServiceBusServerlessTransport(TopicTopology.Default) | ||
| { | ||
| //this needs to match | ||
| ConnectionName = "AzureWebJobsServiceBus" | ||
| }; | ||
|
|
||
| // if they differ this needs to be done | ||
| endpoint.OverrideLocalAddress("ReceiverEndpoint"); | ||
| endpoint.UseTransport(transport); | ||
| endpoint.EnableInstallers(); | ||
| endpoint.UsePersistence<LearningPersistence>(); | ||
| endpoint.UseSerialization<SystemJsonSerializer>(); | ||
|
|
@@ -31,17 +51,37 @@ | |
| endpoint.AddHandler<SomeEventMessageHandler>(); | ||
| }); | ||
|
|
||
| builder.AddNServiceBusFunction("AnotherReceiverEndpoint", endpoint => | ||
| //option 2: Pass in a manifest that we have source genned | ||
| builder.AddNServiceBusFunction(NServiceBusEndpoints.AnotherReceiverEndpoint, configuration => | ||
| { | ||
| endpoint.UseTransport(new AzureServiceBusServerlessTransport(TopicTopology.Default) | ||
| { | ||
| ConnectionName = "AnotherServiceBusConnection" | ||
| }); | ||
| endpoint.EnableInstallers(); | ||
| endpoint.UsePersistence<LearningPersistence>(); | ||
| endpoint.UseSerialization<SystemJsonSerializer>(); | ||
| configuration.UseTransport(new AzureServiceBusServerlessTransport(TopicTopology.Default)); | ||
| configuration.EnableInstallers(); | ||
| configuration.UsePersistence<LearningPersistence>(); | ||
| configuration.UseSerialization<SystemJsonSerializer>(); | ||
|
|
||
| endpoint.AddHandler<SomeEventMessageHandler>(); | ||
| configuration.AddHandler<SomeEventMessageHandler>(); | ||
| }); | ||
|
|
||
| //option 3: Use a type that we have source genned | ||
| builder.AddNServiceBusFunction<AnotherReceiverEndpoint2>(configuration => | ||
| { | ||
| configuration.UseTransport(new AzureServiceBusServerlessTransport(TopicTopology.Default)); | ||
| configuration.EnableInstallers(); | ||
| configuration.UsePersistence<LearningPersistence>(); | ||
| configuration.UseSerialization<SystemJsonSerializer>(); | ||
|
|
||
| configuration.AddHandler<SomeEventMessageHandler>(); | ||
| }); | ||
|
|
||
| //option 4: Use source genned method | ||
| builder.AddAnotherEndpoint3NServiceBusFunction(configuration => | ||
| { | ||
| configuration.UseTransport(new AzureServiceBusServerlessTransport(TopicTopology.Default)); | ||
| configuration.EnableInstallers(); | ||
| configuration.UsePersistence<LearningPersistence>(); | ||
| configuration.UseSerialization<SystemJsonSerializer>(); | ||
|
|
||
| configuration.AddHandler<SomeEventMessageHandler>(); | ||
| }); | ||
|
|
||
| var host = builder.Build(); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,26 +7,56 @@ namespace IntegrationTest; | |
| using Microsoft.Extensions.DependencyInjection; | ||
| using NServiceBus.AzureFunctions.AzureServiceBus; | ||
|
|
||
| public class ReceiverEndpoint([FromKeyedServices("ReceiverEndpoint")] IMessageProcessor processor) | ||
| public class ReceiverEndpointFunction([FromKeyedServices("ReceiverEndpoint")] IMessageProcessor processor) | ||
| { | ||
| [Function("ReceiverEndpoint")] | ||
| public Task Receiver( | ||
| [ServiceBusTrigger("ReceiverEndpoint", Connection = "AzureWebJobsServiceBus", AutoCompleteMessages = true)] | ||
| ServiceBusReceivedMessage message, | ||
| ServiceBusMessageActions messageActions, FunctionContext context, CancellationToken cancellationToken = default) | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These were leftovers from the old repo and is, at least currently, not needed |
||
| CancellationToken cancellationToken = default) | ||
| { | ||
| return processor.Process(message, messageActions, context, cancellationToken); | ||
| return processor.Process(message, cancellationToken); | ||
| } | ||
| } | ||
|
|
||
| public class AnotherReceiverEndpoint([FromKeyedServices("AnotherReceiverEndpoint")] IMessageProcessor processor) | ||
| public class AnotherReceiverEndpointFunction([FromKeyedServices("AnotherReceiverEndpoint")] IMessageProcessor processor) | ||
| { | ||
| [Function("AnotherReceiverEndpoint")] | ||
| public Task Receiver( | ||
| [ServiceBusTrigger("AnotherReceiverEndpoint", Connection = "AzureWebJobsServiceBus", AutoCompleteMessages = true)] | ||
| ServiceBusReceivedMessage message, | ||
| ServiceBusMessageActions messageActions, FunctionContext context, CancellationToken cancellationToken = default) | ||
| ServiceBusReceivedMessage message, CancellationToken cancellationToken = default) | ||
| { | ||
| return processor.Process(message, cancellationToken); | ||
| } | ||
| } | ||
|
|
||
| public class AnotherReceiverEndpoint2Function([FromKeyedServices("AnotherReceiverEndpoint2")] IMessageProcessor processor) | ||
| { | ||
| [Function("AnotherReceiverEndpoint2")] | ||
| public Task Receiver( | ||
| [ServiceBusTrigger("AnotherReceiverEndpoint2", Connection = "AzureWebJobsServiceBus", AutoCompleteMessages = true)] | ||
| ServiceBusReceivedMessage message, CancellationToken cancellationToken = default) | ||
| { | ||
| return processor.Process(message, messageActions, context, cancellationToken); | ||
| return processor.Process(message, cancellationToken); | ||
| } | ||
| } | ||
|
|
||
| public class AnotherReceiverEndpoint3Function | ||
| { | ||
| [Function("AnotherReceiverEndpoint3")] | ||
| public Task Receiver( | ||
| [ServiceBusTrigger("AnotherReceiverEndpoint3", Connection = "AzureWebJobsServiceBus", AutoCompleteMessages = true)] | ||
| ServiceBusReceivedMessage message, FunctionContext functionContext, CancellationToken cancellationToken = default) | ||
| { | ||
| //demo using service locator to avoid having to add a ctor | ||
| var processor = functionContext.InstanceServices.GetKeyedService<IMessageProcessor>("AnotherReceiverEndpoint3"); | ||
|
|
||
| if (processor is null) | ||
| { | ||
| //which also allows us to throw a better exception | ||
| throw new InvalidOperationException("AnotherReceiverEndpoint3 is not configured, please add AddBlahBla to your program.cs"); | ||
| } | ||
|
|
||
| return processor.Process(message, cancellationToken); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,22 @@ | ||
| namespace NServiceBus; | ||
|
|
||
| using Microsoft.Extensions.Hosting; | ||
|
|
||
| public class FunctionConfigurationValidator : IHostedService | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This makes sure that all functions that we found have been configured by the user |
||
| { | ||
| public Task StartAsync(CancellationToken cancellationToken = default) | ||
| { | ||
| //TODO: See if we can do this in some other way | ||
| // var allFunctions = FunctionsRegistry.GetAll(); | ||
| // | ||
| // var functionNotConfigured = allFunctions.Where(f => !f.Configured).Select(f => f.Name).ToArray(); | ||
| // if (functionNotConfigured.Any()) | ||
| // { | ||
| // throw new InvalidOperationException($"The following functions have not been configured using {nameof(FunctionsHostApplicationBuilderExtensions.AddNServiceBusFunction)}(...): {string.Join(", ", functionNotConfigured)}"); | ||
| // } | ||
|
|
||
| return Task.CompletedTask; | ||
| } | ||
|
|
||
| public Task StopAsync(CancellationToken cancellationToken = default) => Task.CompletedTask; | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| namespace NServiceBus; | ||
|
|
||
| public record FunctionManifest(string Name, string Queue, string ConnectionName) | ||
| { | ||
| public bool Configured { get; set; } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,6 +2,7 @@ namespace NServiceBus; | |
|
|
||
| using System; | ||
| using AzureFunctions.AzureServiceBus; | ||
| using Configuration.AdvancedExtensibility; | ||
| using Microsoft.Azure.Functions.Worker.Builder; | ||
| using Microsoft.Extensions.Azure; | ||
| using Microsoft.Extensions.DependencyInjection; | ||
|
|
@@ -10,25 +11,92 @@ namespace NServiceBus; | |
|
|
||
| public static class FunctionsHostApplicationBuilderExtensions | ||
| { | ||
| public static void AddNServiceBusFunction<TFunctionManifest>( | ||
| this FunctionsApplicationBuilder builder, | ||
| Action<EndpointConfiguration> configure) where TFunctionManifest : FunctionManifest, new() | ||
| { | ||
| var manifest = new TFunctionManifest(); | ||
| builder.AddNServiceBusFunction(manifest, configure); | ||
| } | ||
|
|
||
| public static void AddNServiceBusFunction( | ||
| this FunctionsApplicationBuilder builder, | ||
| string endpointName, | ||
| Action<EndpointConfiguration> configure) => | ||
| builder.AddNServiceBusFunction(new FunctionManifest(endpointName, endpointName, AzureServiceBusServerlessTransport.DefaultServiceBusConnectionName), configure); | ||
|
|
||
| public static void AddNServiceBusFunction( | ||
| this FunctionsApplicationBuilder builder, | ||
| FunctionManifest functionManifest, | ||
| Action<EndpointConfiguration> configure) | ||
| { | ||
| ArgumentNullException.ThrowIfNull(builder); | ||
| ArgumentNullException.ThrowIfNull(endpointName); | ||
| ArgumentNullException.ThrowIfNull(functionManifest); | ||
| ArgumentNullException.ThrowIfNull(configure); | ||
|
|
||
| builder.Services.AddAzureClientsCore(); | ||
|
|
||
| builder.AddNServiceBusEndpoint(endpointName, configure); | ||
| var endpointName = functionManifest.Name; | ||
| builder.AddNServiceBusEndpoint(endpointName, endpointConfiguration => | ||
| { | ||
| configure(endpointConfiguration); | ||
|
|
||
| var settings = endpointConfiguration.GetSettings(); | ||
| if (settings.GetOrDefault<bool>(AzureServiceBusServerlessTransport.SendOnlyConfigKey)) | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since we have a separate code path for send-only we can guard here |
||
| { | ||
| throw new InvalidOperationException($"Functions can't be send only endpoints, use {nameof(AddSendOnlyNServiceBusEndpoint)}"); | ||
| } | ||
|
|
||
| var transport = GetTransport(settings); | ||
|
|
||
|
|
||
| if (functionManifest.Name != functionManifest.Queue) | ||
| { | ||
| endpointConfiguration.OverrideLocalAddress(functionManifest.Queue); | ||
| } | ||
|
|
||
| transport.ConnectionName = functionManifest.ConnectionName; | ||
|
|
||
| functionManifest.Configured = true; | ||
|
|
||
| builder.Services.AddHostedService<FunctionConfigurationValidator>(); | ||
|
|
||
| builder.Services.AddKeyedSingleton<IMessageProcessor>(endpointName, (sp, _) => new MessageProcessor(transport, sp.GetRequiredKeyedService<MultiHosting.EndpointStarter>(endpointName))); | ||
| }); | ||
| } | ||
|
|
||
| public static void AddSendOnlyNServiceBusEndpoint( | ||
| this FunctionsApplicationBuilder builder, | ||
| string endpointName, | ||
| Action<EndpointConfiguration> configure) | ||
| { | ||
| ArgumentNullException.ThrowIfNull(builder); | ||
| ArgumentNullException.ThrowIfNull(endpointName); | ||
| ArgumentNullException.ThrowIfNull(configure); | ||
|
|
||
| builder.Services.AddKeyedSingleton<IMessageProcessor>(endpointName, (sp, _) => | ||
| builder.AddNServiceBusEndpoint(endpointName, endpointConfiguration => | ||
| { | ||
| var settings = sp.GetRequiredKeyedService<IReadOnlySettings>(endpointName); | ||
| var transport = settings.Get<TransportDefinition>() as AzureServiceBusServerlessTransport | ||
| ?? throw new InvalidOperationException($"Endpoint '{endpointName}' must be configured with an AzureServiceBusServerlessTransport."); | ||
| return new MessageProcessor(transport, sp.GetRequiredKeyedService<MultiHosting.EndpointStarter>(endpointName)); | ||
| configure(endpointConfiguration); | ||
|
|
||
| endpointConfiguration.SendOnly(); | ||
|
|
||
| // Make sure that the correct transport is used | ||
| _ = GetTransport(endpointConfiguration.GetSettings()); | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ugly but gets the job done for now to make sure that the user has configured a AzureServiceBusServerlessTransport |
||
| }); | ||
| } | ||
|
|
||
| static AzureServiceBusServerlessTransport GetTransport(SettingsHolder settings) | ||
| { | ||
| if (!settings.TryGet(out TransportDefinition transport)) | ||
| { | ||
| throw new InvalidOperationException($"{nameof(AzureServiceBusServerlessTransport)} needs to be configured"); | ||
| } | ||
|
|
||
| if (transport is not AzureServiceBusServerlessTransport serverlessTransport) | ||
| { | ||
| throw new InvalidOperationException($"Endpoint must be configured with an {nameof(AzureServiceBusServerlessTransport)}."); | ||
| } | ||
|
|
||
| return serverlessTransport; | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using a separate code path for send-only seems to make validation and other things much cleaner