diff --git a/src/SampleApp/Sample/Program.cs b/src/SampleApp/Sample/Program.cs index d6f125a..8f42bbd 100644 --- a/src/SampleApp/Sample/Program.cs +++ b/src/SampleApp/Sample/Program.cs @@ -66,16 +66,20 @@ // Uncomment next line to render a JSON of text message/responses //.UseConsoleRender(); -// If event grid is set up, switch to processing messages using that -if (builder.Configuration["EventGrid:Topic"] is { Length: > 0 } topic && - builder.Configuration["EventGrid:Key"] is { Length: > 0 } key) +if (builder.Environment.IsProduction()) { - whatsapp.UseEventGridProcessor(new EventGridPublisherClient( - new Uri(topic), new Azure.AzureKeyCredential(key))); + // If event grid is set up, switch to processing messages using that + if (builder.Configuration["EventGrid:Topic"] is { Length: > 0 } topic && + builder.Configuration["EventGrid:Key"] is { Length: > 0 } key) + { + whatsapp.UseEventGridProcessor(new EventGridPublisherClient( + new Uri(topic), new Azure.AzureKeyCredential(key))); + } } - -if (builder.Environment.IsDevelopment()) +else { + whatsapp.UseTaskSchedulerProcessor(); + // Make sure we never timeout when calling back to the console builder.Services.AddHttpClient() .ConfigureHttpClientDefaults(client => client.ConfigureHttpClient(http => diff --git a/src/WhatsApp/AzureFunctionsWebhook.cs b/src/WhatsApp/AzureFunctionsWebhook.cs index 8cc75f1..3943427 100644 --- a/src/WhatsApp/AzureFunctionsWebhook.cs +++ b/src/WhatsApp/AzureFunctionsWebhook.cs @@ -22,9 +22,7 @@ namespace Devlooped.WhatsApp; /// The message handler that will process incoming messages. /// The logger. class AzureFunctionsWebhook( - Idempotency idempotency, IMessageProcessor messageProcessor, - PipelineRunner runner, IWhatsAppClient whatsapp, IWhatsAppHandler handler, IOptions metaOptions, @@ -66,29 +64,7 @@ public async Task Message([HttpTrigger(AuthorizationLevel.Anonymo if (functionOptions.ReactOnMessage != null && message.Type == MessageType.Content) await message.React(functionOptions.ReactOnMessage).SendAsync(whatsapp).Ignore(); - // NOTE: development speed-up does check for idempotency so we avoid re-entering the pipeline while we're - // debugging and WhatsApp may interpret that as a failing callback and invoke us again. In production, though, - // we don't need to incur that cost here since the pipeline will do it before running. - if (hosting.IsDevelopment() && await idempotency.IsProcessedAsync(message, json) != true) - { - // Avoid enqueing to speed up local devloop - _ = Task.Run(async () => - { - try - { - await runner.ProcessAsync(json); - } - catch (Exception e) - { - logger.LogError(e, "Failed to process message"); - } - }); - } - else - { - // Otherwise, enqueue the message processing - await messageProcessor.EnqueueAsync(json); - } + await messageProcessor.EnqueueAsync(json); } else { diff --git a/src/WhatsApp/TaskSchedulerProcessor.cs b/src/WhatsApp/TaskSchedulerProcessor.cs new file mode 100644 index 0000000..136753b --- /dev/null +++ b/src/WhatsApp/TaskSchedulerProcessor.cs @@ -0,0 +1,45 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; + +namespace Devlooped.WhatsApp; + +/// +/// Provides extensions for processing WhatsApp messages asynchronusly +/// using the running app . +/// +public static class TaskSchedulerProcessorExtensions +{ + /// + /// Uses the default (or a previously registered one) + /// to process WhatsApp messages asynchronously without delegating to an external + /// system (like queues or event grid). + /// + public static WhatsAppHandlerBuilder UseTaskSchedulerProcessor(this WhatsAppHandlerBuilder builder) + { + Throw.IfNull(builder); + + if (builder.Services.FirstOrDefault(x => x.ServiceType == typeof(IMessageProcessor)) is { } processor) + builder.Services.Remove(processor); + + builder.Services.TryAddSingleton(TaskScheduler.Default); + builder.Services.AddSingleton(); + + return builder; + } + + class TaskSchedulerMessageProcessor(PipelineRunner runner, TaskScheduler scheduler) : IMessageProcessor + { + public Task EnqueueAsync(string json, CancellationToken cancellation = default) + { + _ = Task.Factory.StartNew( + async () => await ProcessAsync(json), + CancellationToken.None, + TaskCreationOptions.DenyChildAttach, + scheduler).Unwrap(); + + return Task.CompletedTask; + } + + async Task ProcessAsync(string json) => await runner.ProcessAsync(json); + } +} \ No newline at end of file