Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions src/IntegrationTest/GeneratedCode/NServiceBusEndpoints.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
namespace NServiceBus;

using Microsoft.Azure.Functions.Worker.Builder;

public static class NServiceBusEndpoints
{
public static FunctionManifest ReceiverEndpoint = new("ReceiverEndpoint", "ReceiverEndpoint", "AzureWebJobsServiceBus");
public static FunctionManifest AnotherReceiverEndpoint = new("AnotherReceiverEndpoint", "AnotherReceiverEndpoint", "AzureWebJobsServiceBus");
}


public record AnotherReceiverEndpoint2() : FunctionManifest("AnotherReceiverEndpoint2", "AnotherReceiverEndpoint2", "AzureWebJobsServiceBus");

public record AnotherReceiverEndpoint3() : FunctionManifest("AnotherReceiverEndpoint3", "AnotherReceiverEndpoint3", "AzureWebJobsServiceBus");

public static class FunctionsHostApplicationBuilderExtensions
{
public static void AddAnotherEndpoint3NServiceBusFunction(
this FunctionsApplicationBuilder builder,
Action<EndpointConfiguration> configure) =>
builder.AddNServiceBusFunction<AnotherReceiverEndpoint3>(configure);
}
2 changes: 1 addition & 1 deletion src/IntegrationTest/HttpSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public async Task<HttpResponseData> Run(
_ = executionContext; // For now
logger.LogInformation("C# HTTP trigger function received a request.");

await session.Send("ReceiverEndpoint", new TriggerMessage()).ConfigureAwait(false);
await session.Send(new TriggerMessage()).ConfigureAwait(false);

var r = req.CreateResponse(HttpStatusCode.OK);
await r.WriteStringAsync($"{nameof(TriggerMessage)} sent.")
Expand Down
66 changes: 53 additions & 13 deletions src/IntegrationTest/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,36 @@

builder.Services.AddHostedService<InitializeLogger>();

builder.AddNServiceBusFunction("SenderEndpoint", endpoint =>
// Send-only using a separate API since they are not "functions"
builder.AddSendOnlyNServiceBusEndpoint("SenderEndpoint", endpoint =>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a separate code path for send-only seems to make validation and other things much cleaner

{
endpoint.UseTransport(new AzureServiceBusServerlessTransport(TopicTopology.Default));
endpoint.SendOnly();
var transport = new AzureServiceBusServerlessTransport(TopicTopology.Default)
{
//send only endpoints might need to set the connection name
ConnectionName = "AzureWebJobsServiceBus"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the use case where users might want to control the connection name, if so we might want to add this option to AddSendOnlyNServiceBusEndpoint and not on the transport definition

};

var routing = endpoint.UseTransport(transport);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Demo use of routing


routing.RouteToEndpoint(typeof(TriggerMessage), "ReceiverEndpoint");
endpoint.UseSerialization<SystemJsonSerializer>();
});


//NOTE: forgetting to register a function leads to "Exception: Unable to resolve service for type 'NServiceBus.AzureFunctions.AzureServiceBus.IMessageProcessor' while attempting to activate 'IntegrationTest.AnotherReceiverEndpoint3Function'."

//option 1: No source gen on the configuration side, user needs to use correct name, queue and connection name as the function definition
builder.AddNServiceBusFunction("ReceiverEndpoint", endpoint =>
{
endpoint.UseTransport(new AzureServiceBusServerlessTransport(TopicTopology.Default));
var transport = new AzureServiceBusServerlessTransport(TopicTopology.Default)
{
//this needs to match
ConnectionName = "AzureWebJobsServiceBus"
};

// if they differ this needs to be done
endpoint.OverrideLocalAddress("ReceiverEndpoint");
endpoint.UseTransport(transport);
endpoint.EnableInstallers();
endpoint.UsePersistence<LearningPersistence>();
endpoint.UseSerialization<SystemJsonSerializer>();
Expand All @@ -31,17 +51,37 @@
endpoint.AddHandler<SomeEventMessageHandler>();
});

builder.AddNServiceBusFunction("AnotherReceiverEndpoint", endpoint =>
//option 2: Pass in a manifest that we have source genned
builder.AddNServiceBusFunction(NServiceBusEndpoints.AnotherReceiverEndpoint, configuration =>
{
endpoint.UseTransport(new AzureServiceBusServerlessTransport(TopicTopology.Default)
{
ConnectionName = "AnotherServiceBusConnection"
});
endpoint.EnableInstallers();
endpoint.UsePersistence<LearningPersistence>();
endpoint.UseSerialization<SystemJsonSerializer>();
configuration.UseTransport(new AzureServiceBusServerlessTransport(TopicTopology.Default));
configuration.EnableInstallers();
configuration.UsePersistence<LearningPersistence>();
configuration.UseSerialization<SystemJsonSerializer>();

endpoint.AddHandler<SomeEventMessageHandler>();
configuration.AddHandler<SomeEventMessageHandler>();
});

//option 3: Use a type that we have source genned
builder.AddNServiceBusFunction<AnotherReceiverEndpoint2>(configuration =>
{
configuration.UseTransport(new AzureServiceBusServerlessTransport(TopicTopology.Default));
configuration.EnableInstallers();
configuration.UsePersistence<LearningPersistence>();
configuration.UseSerialization<SystemJsonSerializer>();

configuration.AddHandler<SomeEventMessageHandler>();
});

//option 4: Use source genned method
builder.AddAnotherEndpoint3NServiceBusFunction(configuration =>
{
configuration.UseTransport(new AzureServiceBusServerlessTransport(TopicTopology.Default));
configuration.EnableInstallers();
configuration.UsePersistence<LearningPersistence>();
configuration.UseSerialization<SystemJsonSerializer>();

configuration.AddHandler<SomeEventMessageHandler>();
});

var host = builder.Build();
Expand Down
44 changes: 37 additions & 7 deletions src/IntegrationTest/ReceiverEndpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,56 @@ namespace IntegrationTest;
using Microsoft.Extensions.DependencyInjection;
using NServiceBus.AzureFunctions.AzureServiceBus;

public class ReceiverEndpoint([FromKeyedServices("ReceiverEndpoint")] IMessageProcessor processor)
public class ReceiverEndpointFunction([FromKeyedServices("ReceiverEndpoint")] IMessageProcessor processor)
{
[Function("ReceiverEndpoint")]
public Task Receiver(
[ServiceBusTrigger("ReceiverEndpoint", Connection = "AzureWebJobsServiceBus", AutoCompleteMessages = true)]
ServiceBusReceivedMessage message,
ServiceBusMessageActions messageActions, FunctionContext context, CancellationToken cancellationToken = default)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These were leftovers from the old repo and is, at least currently, not needed

CancellationToken cancellationToken = default)
{
return processor.Process(message, messageActions, context, cancellationToken);
return processor.Process(message, cancellationToken);
}
}

public class AnotherReceiverEndpoint([FromKeyedServices("AnotherReceiverEndpoint")] IMessageProcessor processor)
public class AnotherReceiverEndpointFunction([FromKeyedServices("AnotherReceiverEndpoint")] IMessageProcessor processor)
{
[Function("AnotherReceiverEndpoint")]
public Task Receiver(
[ServiceBusTrigger("AnotherReceiverEndpoint", Connection = "AzureWebJobsServiceBus", AutoCompleteMessages = true)]
ServiceBusReceivedMessage message,
ServiceBusMessageActions messageActions, FunctionContext context, CancellationToken cancellationToken = default)
ServiceBusReceivedMessage message, CancellationToken cancellationToken = default)
{
return processor.Process(message, cancellationToken);
}
}

public class AnotherReceiverEndpoint2Function([FromKeyedServices("AnotherReceiverEndpoint2")] IMessageProcessor processor)
{
[Function("AnotherReceiverEndpoint2")]
public Task Receiver(
[ServiceBusTrigger("AnotherReceiverEndpoint2", Connection = "AzureWebJobsServiceBus", AutoCompleteMessages = true)]
ServiceBusReceivedMessage message, CancellationToken cancellationToken = default)
{
return processor.Process(message, messageActions, context, cancellationToken);
return processor.Process(message, cancellationToken);
}
}

public class AnotherReceiverEndpoint3Function
{
[Function("AnotherReceiverEndpoint3")]
public Task Receiver(
[ServiceBusTrigger("AnotherReceiverEndpoint3", Connection = "AzureWebJobsServiceBus", AutoCompleteMessages = true)]
ServiceBusReceivedMessage message, FunctionContext functionContext, CancellationToken cancellationToken = default)
{
//demo using service locator to avoid having to add a ctor
var processor = functionContext.InstanceServices.GetKeyedService<IMessageProcessor>("AnotherReceiverEndpoint3");

if (processor is null)
{
//which also allows us to throw a better exception
throw new InvalidOperationException("AnotherReceiverEndpoint3 is not configured, please add AddBlahBla to your program.cs");
}

return processor.Process(message, cancellationToken);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,19 @@ namespace NServiceBus;
using Microsoft.Extensions.Azure;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using NServiceBus.AzureFunctions.AzureServiceBus;
using NServiceBus.AzureFunctions.AzureServiceBus.Serverless.TransportWrapper;
using NServiceBus.Transport;

public class AzureServiceBusServerlessTransport : TransportDefinition
public class AzureServiceBusServerlessTransport(TopicTopology topology) : TransportDefinition(TransportTransactionMode.ReceiveOnly,
supportsDelayedDelivery: true,
supportsPublishSubscribe: true,
supportsTTBR: true)
{
readonly AzureServiceBusTransport innerTransport;

public AzureServiceBusServerlessTransport(TopicTopology topology)
: base(TransportTransactionMode.ReceiveOnly,
supportsDelayedDelivery: true,
supportsPublishSubscribe: true,
supportsTTBR: true)
{
innerTransport = new AzureServiceBusTransport("TransportWillBeInitializedCorrectlyLater", topology)
{
TransportTransactionMode = TransportTransactionMode.ReceiveOnly
};
}

protected override void ConfigureServicesCore(IServiceCollection services) => innerTransport.ConfigureServices(services);

public string ConnectionName { get; set; } = DefaultServiceBusConnectionName;

internal IInternalMessageProcessor MessageProcessor { get; private set; } = null!;
internal PipelineInvokingMessageProcessor? MessageProcessor { get; private set; }

public override async Task<TransportInfrastructure> Initialize(
HostSettings hostSettings,
Expand Down Expand Up @@ -68,15 +56,16 @@ public override async Task<TransportInfrastructure> Initialize(

var isSendOnly = hostSettings.CoreSettings.GetOrDefault<bool>(SendOnlyConfigKey);

MessageProcessor = isSendOnly
? new SendOnlyMessageProcessor()
: (IInternalMessageProcessor)serverlessTransportInfrastructure.Receivers[MainReceiverId];
if (!isSendOnly)
{
MessageProcessor = (PipelineInvokingMessageProcessor)serverlessTransportInfrastructure.Receivers[MainReceiverId];
}

return serverlessTransportInfrastructure;
}

public override IReadOnlyCollection<TransportTransactionMode> GetSupportedTransactionModes() => supportedTransactionModes;
public override IReadOnlyCollection<TransportTransactionMode> GetSupportedTransactionModes() => [TransportTransactionMode.ReceiveOnly];

static AzureServiceBusTransport ConfigureTransportConnection(
string connectionName,
IConfiguration configuration,
Expand Down Expand Up @@ -123,9 +112,9 @@ static AzureServiceBusTransport ConfigureTransportConnection(
[UnsafeAccessor(UnsafeAccessorKind.Field, Name = "<TokenCredential>k__BackingField")]
static extern ref TokenCredential GetTokenCredentialRef(AzureServiceBusTransport transport);

readonly AzureServiceBusTransport innerTransport = new("TransportWillBeInitializedCorrectlyLater", topology) { TransportTransactionMode = TransportTransactionMode.ReceiveOnly };

const string MainReceiverId = "Main";
const string SendOnlyConfigKey = "Endpoint.SendOnly";
internal const string SendOnlyConfigKey = "Endpoint.SendOnly";
internal const string DefaultServiceBusConnectionName = "AzureWebJobsServiceBus";

readonly TransportTransactionMode[] supportedTransactionModes = [TransportTransactionMode.ReceiveOnly];
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
namespace NServiceBus;

using Microsoft.Extensions.Hosting;

public class FunctionConfigurationValidator : IHostedService
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sure that all functions that we found have been configured by the user

{
public Task StartAsync(CancellationToken cancellationToken = default)
{
//TODO: See if we can do this in some other way
// var allFunctions = FunctionsRegistry.GetAll();
//
// var functionNotConfigured = allFunctions.Where(f => !f.Configured).Select(f => f.Name).ToArray();
// if (functionNotConfigured.Any())
// {
// throw new InvalidOperationException($"The following functions have not been configured using {nameof(FunctionsHostApplicationBuilderExtensions.AddNServiceBusFunction)}(...): {string.Join(", ", functionNotConfigured)}");
// }

return Task.CompletedTask;
}

public Task StopAsync(CancellationToken cancellationToken = default) => Task.CompletedTask;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace NServiceBus;

public record FunctionManifest(string Name, string Queue, string ConnectionName)
{
public bool Configured { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ namespace NServiceBus;

using System;
using AzureFunctions.AzureServiceBus;
using Configuration.AdvancedExtensibility;
using Microsoft.Azure.Functions.Worker.Builder;
using Microsoft.Extensions.Azure;
using Microsoft.Extensions.DependencyInjection;
Expand All @@ -10,25 +11,92 @@ namespace NServiceBus;

public static class FunctionsHostApplicationBuilderExtensions
{
public static void AddNServiceBusFunction<TFunctionManifest>(
this FunctionsApplicationBuilder builder,
Action<EndpointConfiguration> configure) where TFunctionManifest : FunctionManifest, new()
{
var manifest = new TFunctionManifest();
builder.AddNServiceBusFunction(manifest, configure);
}

public static void AddNServiceBusFunction(
this FunctionsApplicationBuilder builder,
string endpointName,
Action<EndpointConfiguration> configure) =>
builder.AddNServiceBusFunction(new FunctionManifest(endpointName, endpointName, AzureServiceBusServerlessTransport.DefaultServiceBusConnectionName), configure);

public static void AddNServiceBusFunction(
this FunctionsApplicationBuilder builder,
FunctionManifest functionManifest,
Action<EndpointConfiguration> configure)
{
ArgumentNullException.ThrowIfNull(builder);
ArgumentNullException.ThrowIfNull(endpointName);
ArgumentNullException.ThrowIfNull(functionManifest);
ArgumentNullException.ThrowIfNull(configure);

builder.Services.AddAzureClientsCore();

builder.AddNServiceBusEndpoint(endpointName, configure);
var endpointName = functionManifest.Name;
builder.AddNServiceBusEndpoint(endpointName, endpointConfiguration =>
{
configure(endpointConfiguration);

var settings = endpointConfiguration.GetSettings();
if (settings.GetOrDefault<bool>(AzureServiceBusServerlessTransport.SendOnlyConfigKey))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we have a separate code path for send-only we can guard here

{
throw new InvalidOperationException($"Functions can't be send only endpoints, use {nameof(AddSendOnlyNServiceBusEndpoint)}");
}

var transport = GetTransport(settings);


if (functionManifest.Name != functionManifest.Queue)
{
endpointConfiguration.OverrideLocalAddress(functionManifest.Queue);
}

transport.ConnectionName = functionManifest.ConnectionName;

functionManifest.Configured = true;

builder.Services.AddHostedService<FunctionConfigurationValidator>();

builder.Services.AddKeyedSingleton<IMessageProcessor>(endpointName, (sp, _) => new MessageProcessor(transport, sp.GetRequiredKeyedService<MultiHosting.EndpointStarter>(endpointName)));
});
}

public static void AddSendOnlyNServiceBusEndpoint(
this FunctionsApplicationBuilder builder,
string endpointName,
Action<EndpointConfiguration> configure)
{
ArgumentNullException.ThrowIfNull(builder);
ArgumentNullException.ThrowIfNull(endpointName);
ArgumentNullException.ThrowIfNull(configure);

builder.Services.AddKeyedSingleton<IMessageProcessor>(endpointName, (sp, _) =>
builder.AddNServiceBusEndpoint(endpointName, endpointConfiguration =>
{
var settings = sp.GetRequiredKeyedService<IReadOnlySettings>(endpointName);
var transport = settings.Get<TransportDefinition>() as AzureServiceBusServerlessTransport
?? throw new InvalidOperationException($"Endpoint '{endpointName}' must be configured with an AzureServiceBusServerlessTransport.");
return new MessageProcessor(transport, sp.GetRequiredKeyedService<MultiHosting.EndpointStarter>(endpointName));
configure(endpointConfiguration);

endpointConfiguration.SendOnly();

// Make sure that the correct transport is used
_ = GetTransport(endpointConfiguration.GetSettings());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ugly but gets the job done for now to make sure that the user has configured a AzureServiceBusServerlessTransport

});
}

static AzureServiceBusServerlessTransport GetTransport(SettingsHolder settings)
{
if (!settings.TryGet(out TransportDefinition transport))
{
throw new InvalidOperationException($"{nameof(AzureServiceBusServerlessTransport)} needs to be configured");
}

if (transport is not AzureServiceBusServerlessTransport serverlessTransport)
{
throw new InvalidOperationException($"Endpoint must be configured with an {nameof(AzureServiceBusServerlessTransport)}.");
}

return serverlessTransport;
}
}
Loading