Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions src/SampleApp/Sample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,20 @@
// Uncomment next line to render a JSON of text message/responses
//.UseConsoleRender();

// If event grid is set up, switch to processing messages using that
if (builder.Configuration["EventGrid:Topic"] is { Length: > 0 } topic &&
builder.Configuration["EventGrid:Key"] is { Length: > 0 } key)
if (builder.Environment.IsProduction())
{
whatsapp.UseEventGridProcessor(new EventGridPublisherClient(
new Uri(topic), new Azure.AzureKeyCredential(key)));
// If event grid is set up, switch to processing messages using that
if (builder.Configuration["EventGrid:Topic"] is { Length: > 0 } topic &&
builder.Configuration["EventGrid:Key"] is { Length: > 0 } key)
{
whatsapp.UseEventGridProcessor(new EventGridPublisherClient(
new Uri(topic), new Azure.AzureKeyCredential(key)));
}
}

if (builder.Environment.IsDevelopment())
else
{
whatsapp.UseTaskSchedulerProcessor();

// Make sure we never timeout when calling back to the console
builder.Services.AddHttpClient()
.ConfigureHttpClientDefaults(client => client.ConfigureHttpClient(http =>
Expand Down
26 changes: 1 addition & 25 deletions src/WhatsApp/AzureFunctionsWebhook.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ namespace Devlooped.WhatsApp;
/// <param name="handler">The message handler that will process incoming messages.</param>
/// <param name="logger">The logger.</param>
class AzureFunctionsWebhook(
Idempotency idempotency,
IMessageProcessor messageProcessor,
PipelineRunner runner,
IWhatsAppClient whatsapp,
IWhatsAppHandler handler,
IOptions<MetaOptions> metaOptions,
Expand Down Expand Up @@ -66,29 +64,7 @@ public async Task<IActionResult> Message([HttpTrigger(AuthorizationLevel.Anonymo
if (functionOptions.ReactOnMessage != null && message.Type == MessageType.Content)
await message.React(functionOptions.ReactOnMessage).SendAsync(whatsapp).Ignore();

// NOTE: development speed-up does check for idempotency so we avoid re-entering the pipeline while we're
// debugging and WhatsApp may interpret that as a failing callback and invoke us again. In production, though,
// we don't need to incur that cost here since the pipeline will do it before running.
if (hosting.IsDevelopment() && await idempotency.IsProcessedAsync(message, json) != true)
{
// Avoid enqueing to speed up local devloop
_ = Task.Run(async () =>
{
try
{
await runner.ProcessAsync(json);
}
catch (Exception e)
{
logger.LogError(e, "Failed to process message");
}
});
}
else
{
// Otherwise, enqueue the message processing
await messageProcessor.EnqueueAsync(json);
}
await messageProcessor.EnqueueAsync(json);
}
else
{
Expand Down
45 changes: 45 additions & 0 deletions src/WhatsApp/TaskSchedulerProcessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;

namespace Devlooped.WhatsApp;

/// <summary>
/// Provides extensions for processing WhatsApp messages asynchronusly
/// using the running app <see cref="TaskScheduler"/>.
/// </summary>
public static class TaskSchedulerProcessorExtensions
{
/// <summary>
/// Uses the default <see cref="TaskScheduler"/> (or a previously registered one)
/// to process WhatsApp messages asynchronously without delegating to an external
/// system (like queues or event grid).
/// </summary>
public static WhatsAppHandlerBuilder UseTaskSchedulerProcessor(this WhatsAppHandlerBuilder builder)
{
Throw.IfNull(builder);

if (builder.Services.FirstOrDefault(x => x.ServiceType == typeof(IMessageProcessor)) is { } processor)
builder.Services.Remove(processor);

builder.Services.TryAddSingleton(TaskScheduler.Default);
builder.Services.AddSingleton<IMessageProcessor, TaskSchedulerMessageProcessor>();

return builder;
}

class TaskSchedulerMessageProcessor(PipelineRunner runner, TaskScheduler scheduler) : IMessageProcessor
{
public Task EnqueueAsync(string json, CancellationToken cancellation = default)
{
_ = Task.Factory.StartNew(
async () => await ProcessAsync(json),
CancellationToken.None,
TaskCreationOptions.DenyChildAttach,
scheduler).Unwrap();

return Task.CompletedTask;
}

async Task ProcessAsync(string json) => await runner.ProcessAsync(json);
}
}