diff --git a/.netconfig b/.netconfig index e66fb8d..8934c7b 100644 --- a/.netconfig +++ b/.netconfig @@ -158,3 +158,13 @@ weak etag = 371342087cec0269473a91f7fc295fd1049e21f016a1b7113614f2c4e3eefe5f +[file "src/WhatsApp/Extensions/AsyncLazy.cs"] + url = https://github.com/devlooped/catbag/blob/main/System/Threading/Tasks/AsyncLazy.cs + sha = 9f3330f09713aa5f746047e3a50ee839147a5797 + etag = 73320600b7a18e0eb25cadc3d687c69dc79181b0458facf526666e150c634782 + weak +[file "src/WhatsApp/Extensions/Base62.cs"] + url = https://github.com/devlooped/catbag/blob/main/System/Base62.cs + sha = cf76df0d6a218c26ebe117339fe3445050b0532a + etag = aed711a45e051edfddfcb76d9f8021d30f9817c342cfe8d1cc38f2af37b47aa8 + weak diff --git a/src/SampleApp/Sample/Program.cs b/src/SampleApp/Sample/Program.cs index fd3fcff..d6f125a 100644 --- a/src/SampleApp/Sample/Program.cs +++ b/src/SampleApp/Sample/Program.cs @@ -6,6 +6,7 @@ using Devlooped; using Devlooped.WhatsApp; using Microsoft.Azure.Functions.Worker.Builder; +using Microsoft.Extensions.Caching.Hybrid; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; diff --git a/src/SampleApp/Sample/Sample.csproj b/src/SampleApp/Sample/Sample.csproj index f76d6c4..9f3c55a 100644 --- a/src/SampleApp/Sample/Sample.csproj +++ b/src/SampleApp/Sample/Sample.csproj @@ -24,6 +24,7 @@ + diff --git a/src/Tests/IdempotencyTests.cs b/src/Tests/IdempotencyTests.cs new file mode 100644 index 0000000..6fe0364 --- /dev/null +++ b/src/Tests/IdempotencyTests.cs @@ -0,0 +1,45 @@ +using Azure.Data.Tables; +using Microsoft.Extensions.Caching.Hybrid; +using Microsoft.Extensions.DependencyInjection; + +namespace Devlooped.WhatsApp; + +public class IdempotencyTests +{ + [Fact] + public async Task CanAddProcessedItem() + { + var client = CloudStorageAccount.DevelopmentStorageAccount.CreateTableServiceClient(); + var table = client.GetTableClient("WhatsAppWebhook"); + var collection = new ServiceCollection(); + collection.AddHybridCache(); + var cache = collection.BuildServiceProvider().GetRequiredService(); + + var idempotent = new Idempotency(client, cache); + var pk = nameof(CanAddProcessedItem); + var rk = Ulid.NewUlid().ToString(); + + // Initially unprocessed + Assert.False(await idempotent.IsProcessedAsync(pk, rk)); + + // The etag is used for optimistic concurrency on resetting + var etag = await idempotent.TrySetProcessedAsync(pk, rk); + + Assert.NotNull(etag); + Assert.True(await idempotent.IsProcessedAsync(pk, rk)); + + // Can't set again, the 409 conflict will mark it as true if it isn't already. + Assert.Null(await idempotent.TrySetProcessedAsync(pk, rk)); + + // Simulates a failure in processing so we're returning the item to the processing pool + await idempotent.ResetProcessedAsync(pk, rk, etag.Value); + + Assert.False((await table.GetEntityIfExistsAsync(pk, rk)).HasValue); + + // Simulate another process picking the item up at this point and writing back to storage + await table.AddEntityAsync(new TableEntity(pk, rk)); + + // The check would now re-read from storage and see it since we restored. + Assert.True(await idempotent.IsProcessedAsync(pk, rk)); + } +} diff --git a/src/WhatsApp/AzureFunctionsWebhook.cs b/src/WhatsApp/AzureFunctionsWebhook.cs index 42a249b..8cc75f1 100644 --- a/src/WhatsApp/AzureFunctionsWebhook.cs +++ b/src/WhatsApp/AzureFunctionsWebhook.cs @@ -1,9 +1,7 @@ -using System; -using System.Diagnostics; +using System.Diagnostics; using System.Text; using System.Text.Json; using System.Text.Json.Nodes; -using Azure.Data.Tables; using Devlooped.WhatsApp.Flows; using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.Mvc; @@ -24,7 +22,7 @@ namespace Devlooped.WhatsApp; /// The message handler that will process incoming messages. /// The logger. class AzureFunctionsWebhook( - TableServiceClient tableClient, + Idempotency idempotency, IMessageProcessor messageProcessor, PipelineRunner runner, IWhatsAppClient whatsapp, @@ -65,19 +63,13 @@ public async Task Message([HttpTrigger(AuthorizationLevel.Anonymo if (message is UserMessage user) await user.SendProgress(whatsapp, functionOptions.ReadOnMessage is true, functionOptions.TypingOnMessage is true).Ignore(); - // Ensure idempotent processing - var table = tableClient.GetTableClient("WhatsAppWebhook"); - await table.CreateIfNotExistsAsync(); - if (await table.GetEntityIfExistsAsync(message.User.Number, message.NotificationId) is { HasValue: true } existing) - { - logger.LogInformation("Skipping already handled message {Id}", message.Id); - return new OkResult(); - } - if (functionOptions.ReactOnMessage != null && message.Type == MessageType.Content) await message.React(functionOptions.ReactOnMessage).SendAsync(whatsapp).Ignore(); - if (hosting.IsDevelopment()) + // NOTE: development speed-up does check for idempotency so we avoid re-entering the pipeline while we're + // debugging and WhatsApp may interpret that as a failing callback and invoke us again. In production, though, + // we don't need to incur that cost here since the pipeline will do it before running. + if (hosting.IsDevelopment() && await idempotency.IsProcessedAsync(message, json) != true) { // Avoid enqueing to speed up local devloop _ = Task.Run(async () => diff --git a/src/WhatsApp/Extensions/AsyncLazy.cs b/src/WhatsApp/Extensions/AsyncLazy.cs new file mode 100644 index 0000000..c0955cb --- /dev/null +++ b/src/WhatsApp/Extensions/AsyncLazy.cs @@ -0,0 +1,95 @@ +// +#region License +// MIT License +// +// Copyright (c) Daniel Cazzulino +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. +#endregion + +using System.Runtime.CompilerServices; + +namespace System.Threading.Tasks +{ + /// + /// Provides factory methods to create so that + /// the T can be inferred from the received value factory return type. + /// + /// + /// Usage: + /// + /// var lazy = AsyncLazy.Create(() => ...); + /// + /// var value = await lazy.Value; + /// + /// + static partial class AsyncLazy + { + /// + /// Creates an using the given . + /// + public static AsyncLazy Create(Func valueFactory) => new AsyncLazy(valueFactory); + + /// + /// Creates an using the given . + /// + public static AsyncLazy Create(Func> asyncValueFactory) => new AsyncLazy(asyncValueFactory); + } + + /// + /// A that initializes asynchronously and whose + /// can be awaited for initialization completion. + /// + /// + /// Basically taken from https://devblogs.microsoft.com/pfxteam/asynclazyt/. + /// Usage: + /// + /// var lazy = new AsyncLazy<T>(() => ...); + /// + /// var value = await lazy.Value; + /// + /// + /// The type of async lazily-initialized value. + partial class AsyncLazy : Lazy> + { + /// + /// Initializes the lazy, using to asynchronously + /// schedule the value factory execution. + /// + public AsyncLazy(Func valueFactory) : base(() => Task.Run(valueFactory)) + { } + + /// + /// Initializes the lazy, using to asynchronously + /// schedule the value factory execution. + /// + public AsyncLazy(Func> asyncValueFactory) : base(() => Task.Run(() => asyncValueFactory())) + { } + + /// + /// Allows awaiting the async lazy directly. + /// + public TaskAwaiter GetAwaiter() => Value.GetAwaiter(); + + /// + /// Gets a value indicating whether the value factory has been invoked and has run to completion. + /// + public bool IsValueFactoryCompleted => base.Value.IsCompleted; + } +} diff --git a/src/WhatsApp/Extensions/Base62.cs b/src/WhatsApp/Extensions/Base62.cs new file mode 100644 index 0000000..80c057d --- /dev/null +++ b/src/WhatsApp/Extensions/Base62.cs @@ -0,0 +1,97 @@ +// +#region License +// MIT License +// +// Copyright (c) Daniel Cazzulino +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. +#endregion + +#nullable enable +using System.Linq; +using System.Numerics; +using System.Text; + +namespace System +{ + /// + /// Inspired in a bunch of searches, samples and snippets on various languages + /// and blogs and what-not on doing URL shortering :), heavily tweaked to make + /// it fully idiomatic in C#. + /// + static partial class Base62 + { + /// + /// Encodes a Guid into a base62 string. + /// + public static string ToBase62(this Guid guid) + => Encode(new BigInteger(guid.ToByteArray())); + + /// + /// Encodes the absolute numeric value into a base62 string. + /// + public static string Encode(BigInteger value) + { + if (value == 0) + return "0"; + + // normalize sign in case we got negative. + value = BigInteger.Abs(value); + + // TODO: I'm almost sure there's a more succint way + // of doing this with LINQ and Aggregate, but just + // can't figure it out... + var sb = new StringBuilder(); + + while (value != 0) + { + sb = sb.Append(ToBase62(value % 62)); + value /= 62; + } + + // Reverse the string, since we're building it backwards, + var chars = sb.ToString().ToCharArray(); + Array.Reverse(chars); + + return new string(chars); + } + + /// + /// Decodes a base62 string into its original numeric value. + /// + public static BigInteger Decode(string value) + => value.Aggregate(new BigInteger(0), (current, c) => current * 62 + FromBase62(c)); + + static char ToBase62(BigInteger d) => d switch + { + BigInteger v when v < 10 => (char)('0' + d), + BigInteger v when v < 36 => (char)('A' + d - 10), + BigInteger v when v < 62 => (char)('a' + d - 36), + _ => throw new ArgumentException($"Cannot encode digit {d} to base 62.", nameof(d)), + }; + + static BigInteger FromBase62(char c) => c switch + { + char v when c >= 'a' && v <= 'z' => 36 + c - 'a', + char v when c >= 'A' && v <= 'Z' => 10 + c - 'A', + char v when c >= '0' && v <= '9' => c - '0', + _ => throw new ArgumentException($"Cannot decode char '{c}' from base 62.", nameof(c)), + }; + } +} diff --git a/src/WhatsApp/Idempotency.cs b/src/WhatsApp/Idempotency.cs new file mode 100644 index 0000000..9fe4f2c --- /dev/null +++ b/src/WhatsApp/Idempotency.cs @@ -0,0 +1,70 @@ +using System.Numerics; +using System.Security.Cryptography; +using System.Text; +using Azure; +using Azure.Data.Tables; +using Microsoft.Extensions.Caching.Hybrid; + +namespace Devlooped.WhatsApp; + +static class IdempotencyExtensions +{ + public static ValueTask IsProcessedAsync(this Idempotency idempotency, Message message, string json, CancellationToken token = default) + => idempotency.IsProcessedAsync(message.User.Number, RowKey(message, json), token); + + public static async ValueTask TrySetProcessedAsync(this Idempotency idempotency, Message message, string json, CancellationToken token = default) + => await idempotency.TrySetProcessedAsync(message.User.Number, RowKey(message, json), token); + + public static async ValueTask ResetProcessedAsync(this Idempotency idempotency, Message message, string json, ETag etag, CancellationToken token = default) + => await idempotency.ResetProcessedAsync(message.User.Number, RowKey(message, json), etag, token); + + static string RowKey(Message message, string payload) + => message.Id.StartsWith("wamid.", StringComparison.Ordinal) ? message.Id : Base62.Encode(new BigInteger(MD5.HashData(Encoding.UTF8.GetBytes(payload)), isUnsigned: true, isBigEndian: true)); +} + +class Idempotency(TableServiceClient client, HybridCache cache) +{ + static readonly HybridCacheEntryOptions options = new() + { + LocalCacheExpiration = TimeSpan.FromDays(3), + Expiration = TimeSpan.FromDays(30) + }; + + readonly AsyncLazy table = new(async () => + { + var table = client.GetTableClient("WhatsAppWebhook"); + await table.CreateIfNotExistsAsync(); + return table; + }); + + public ValueTask IsProcessedAsync(string partitionKey, string rowKey, CancellationToken token = default) + => cache.GetOrCreateAsync(Key(partitionKey, rowKey), + async key => await (await table).GetEntityIfExistsAsync(partitionKey, rowKey, cancellationToken: token) is { HasValue: true }, + options, cancellationToken: token); + + public async ValueTask TrySetProcessedAsync(string partitionKey, string rowKey, CancellationToken token = default) + { + var key = Key(partitionKey, rowKey); + try + { + var entity = await (await table).AddEntityAsync(new TableEntity(partitionKey, rowKey), token); + await cache.SetAsync(key, true, options, cancellationToken: token); + return entity.Headers.ETag ?? ETag.All; + } + catch (RequestFailedException ex) when (ex.Status == 409) + { + await cache.SetAsync(key, true, options, cancellationToken: token); + return null; + } + } + + public async ValueTask ResetProcessedAsync(string partitionKey, string rowKey, ETag etag, CancellationToken token = default) + { + // If actual processing of a previously marked item failed, we want to return its unprocessed state + var key = Key(partitionKey, rowKey); + await (await table).DeleteEntityAsync(partitionKey, rowKey, etag, token); + await cache.RemoveAsync(key, token); + } + + static string Key(string partitionKey, string rowKey) => $"wa:dup:{partitionKey}/{rowKey}"; +} diff --git a/src/WhatsApp/PipelineRunner.cs b/src/WhatsApp/PipelineRunner.cs index 4abf12d..4fdb1d2 100644 --- a/src/WhatsApp/PipelineRunner.cs +++ b/src/WhatsApp/PipelineRunner.cs @@ -5,7 +5,7 @@ namespace Devlooped.WhatsApp; class PipelineRunner( - TableServiceClient tableClient, + Idempotency idempotency, IWhatsAppClient whatsapp, IWhatsAppHandler handler, IOptions functionOptions, @@ -19,6 +19,12 @@ public async Task ProcessAsync(string json) if (await Message.DeserializeAsync(json) is { } message) { + if (await idempotency.IsProcessedAsync(message, json)) + { + logger.LogInformation("Skipping already handled message {Id}", message.Id); + return; + } + // If we got a user message, we can send progress updates as configured. We ignore exceptions in the // operation since it can be a notification for an old message or it may have been deleted by the user. if (message is UserMessage user) @@ -27,20 +33,26 @@ public async Task ProcessAsync(string json) // Ensure idempotent processing at dequeue time, since we might have been called // multiple times for the same message by WhatsApp (Message method) while processing was still // happening (and therefore we didn't save the entity yet). - var table = tableClient.GetTableClient("WhatsAppWebhook"); - await table.CreateIfNotExistsAsync(); - if (await table.GetEntityIfExistsAsync(message.User.Number, message.NotificationId) is { HasValue: true } existing) + logger.LogInformation("Processing work item: {Id}", message.Id); + var etag = await idempotency.TrySetProcessedAsync(message, json); + if (etag == null) { logger.LogInformation("Skipping already handled message {Id}", message.Id); return; } - // Await all responses - // No action needed, just make sure all items are processed - await handler.HandleAsync([message]).ToArrayAsync(); - - await table.UpsertEntityAsync(new TableEntity(message.User.Number, message.Id)); - logger.LogInformation($"Completed work item: {message.Id}"); + try + { + // Await all responses + // No action needed, just make sure all items are processed + await handler.HandleAsync([message]).ToArrayAsync(); + logger.LogInformation($"Completed work item: {message.Id}"); + } + catch (Exception e) + { + logger.LogError(e, "Failed to process message {Id}", message.Id); + await idempotency.ResetProcessedAsync(message, json, etag.Value); + } } else { diff --git a/src/WhatsApp/WhatsApp.csproj b/src/WhatsApp/WhatsApp.csproj index 41529a8..8a7714d 100644 --- a/src/WhatsApp/WhatsApp.csproj +++ b/src/WhatsApp/WhatsApp.csproj @@ -23,6 +23,7 @@ + diff --git a/src/WhatsApp/WhatsAppServiceCollectionExtensions.cs b/src/WhatsApp/WhatsAppServiceCollectionExtensions.cs index 1d1a466..73a80a8 100644 --- a/src/WhatsApp/WhatsAppServiceCollectionExtensions.cs +++ b/src/WhatsApp/WhatsAppServiceCollectionExtensions.cs @@ -1,6 +1,7 @@ using Azure.Data.Tables; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; namespace Devlooped.WhatsApp; @@ -232,6 +233,8 @@ public static WhatsAppHandlerBuilder AddWhatsApp? configure) { services.AddHttpClient("whatsapp").AddStandardResilienceHandler(); + services.AddHybridCache(); + services.AddSingleton(); if (services.FirstOrDefault(x => x.ServiceType == typeof(IWhatsAppClient)) == null) services.Add(new ServiceDescriptor(typeof(IWhatsAppClient), typeof(WhatsAppClient), lifetime));