Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .netconfig
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,13 @@

weak
etag = 371342087cec0269473a91f7fc295fd1049e21f016a1b7113614f2c4e3eefe5f
[file "src/WhatsApp/Extensions/AsyncLazy.cs"]
url = https://github.com/devlooped/catbag/blob/main/System/Threading/Tasks/AsyncLazy.cs
sha = 9f3330f09713aa5f746047e3a50ee839147a5797
etag = 73320600b7a18e0eb25cadc3d687c69dc79181b0458facf526666e150c634782
weak
[file "src/WhatsApp/Extensions/Base62.cs"]
url = https://github.com/devlooped/catbag/blob/main/System/Base62.cs
sha = cf76df0d6a218c26ebe117339fe3445050b0532a
etag = aed711a45e051edfddfcb76d9f8021d30f9817c342cfe8d1cc38f2af37b47aa8
weak
1 change: 1 addition & 0 deletions src/SampleApp/Sample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using Devlooped;
using Devlooped.WhatsApp;
using Microsoft.Azure.Functions.Worker.Builder;
using Microsoft.Extensions.Caching.Hybrid;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
Expand Down
1 change: 1 addition & 0 deletions src/SampleApp/Sample/Sample.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
<PackageReference Include="OpenTelemetry.Instrumentation.Http" Version="1.12.0" />
<PackageReference Include="OpenTelemetry.Instrumentation.Runtime" Version="1.12.0" />
<PackageReference Include="System.Linq.Async" Version="6.0.3" />
<PackageReference Include="Microsoft.Extensions.Caching.Hybrid" Version="9.9.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\WhatsApp\WhatsApp.csproj" />
Expand Down
45 changes: 45 additions & 0 deletions src/Tests/IdempotencyTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
using Azure.Data.Tables;
using Microsoft.Extensions.Caching.Hybrid;
using Microsoft.Extensions.DependencyInjection;

namespace Devlooped.WhatsApp;

public class IdempotencyTests
{
[Fact]
public async Task CanAddProcessedItem()
{
var client = CloudStorageAccount.DevelopmentStorageAccount.CreateTableServiceClient();
var table = client.GetTableClient("WhatsAppWebhook");
var collection = new ServiceCollection();
collection.AddHybridCache();
var cache = collection.BuildServiceProvider().GetRequiredService<HybridCache>();

var idempotent = new Idempotency(client, cache);
var pk = nameof(CanAddProcessedItem);
var rk = Ulid.NewUlid().ToString();

// Initially unprocessed
Assert.False(await idempotent.IsProcessedAsync(pk, rk));

// The etag is used for optimistic concurrency on resetting
var etag = await idempotent.TrySetProcessedAsync(pk, rk);

Assert.NotNull(etag);
Assert.True(await idempotent.IsProcessedAsync(pk, rk));

// Can't set again, the 409 conflict will mark it as true if it isn't already.
Assert.Null(await idempotent.TrySetProcessedAsync(pk, rk));

// Simulates a failure in processing so we're returning the item to the processing pool
await idempotent.ResetProcessedAsync(pk, rk, etag.Value);

Assert.False((await table.GetEntityIfExistsAsync<TableEntity>(pk, rk)).HasValue);

// Simulate another process picking the item up at this point and writing back to storage
await table.AddEntityAsync(new TableEntity(pk, rk));

// The check would now re-read from storage and see it since we restored.
Assert.True(await idempotent.IsProcessedAsync(pk, rk));
}
}
20 changes: 6 additions & 14 deletions src/WhatsApp/AzureFunctionsWebhook.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
using System;
using System.Diagnostics;
using System.Diagnostics;
using System.Text;
using System.Text.Json;
using System.Text.Json.Nodes;
using Azure.Data.Tables;
using Devlooped.WhatsApp.Flows;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
Expand All @@ -24,7 +22,7 @@ namespace Devlooped.WhatsApp;
/// <param name="handler">The message handler that will process incoming messages.</param>
/// <param name="logger">The logger.</param>
class AzureFunctionsWebhook(
TableServiceClient tableClient,
Idempotency idempotency,
IMessageProcessor messageProcessor,
PipelineRunner runner,
IWhatsAppClient whatsapp,
Expand Down Expand Up @@ -65,19 +63,13 @@ public async Task<IActionResult> Message([HttpTrigger(AuthorizationLevel.Anonymo
if (message is UserMessage user)
await user.SendProgress(whatsapp, functionOptions.ReadOnMessage is true, functionOptions.TypingOnMessage is true).Ignore();

// Ensure idempotent processing
var table = tableClient.GetTableClient("WhatsAppWebhook");
await table.CreateIfNotExistsAsync();
if (await table.GetEntityIfExistsAsync<TableEntity>(message.User.Number, message.NotificationId) is { HasValue: true } existing)
{
logger.LogInformation("Skipping already handled message {Id}", message.Id);
return new OkResult();
}

if (functionOptions.ReactOnMessage != null && message.Type == MessageType.Content)
await message.React(functionOptions.ReactOnMessage).SendAsync(whatsapp).Ignore();

if (hosting.IsDevelopment())
// NOTE: development speed-up does check for idempotency so we avoid re-entering the pipeline while we're
// debugging and WhatsApp may interpret that as a failing callback and invoke us again. In production, though,
// we don't need to incur that cost here since the pipeline will do it before running.
if (hosting.IsDevelopment() && await idempotency.IsProcessedAsync(message, json) != true)
{
// Avoid enqueing to speed up local devloop
_ = Task.Run(async () =>
Expand Down
95 changes: 95 additions & 0 deletions src/WhatsApp/Extensions/AsyncLazy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// <auto-generated />
#region License
// MIT License
//
// Copyright (c) Daniel Cazzulino
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
#endregion

using System.Runtime.CompilerServices;

namespace System.Threading.Tasks
{
/// <summary>
/// Provides factory methods to create <see cref="AsyncLazy{T}"/> so that
/// the <c>T</c> can be inferred from the received value factory return type.
/// </summary>
/// <remarks>
/// Usage:
/// <code>
/// var lazy = AsyncLazy.Create(() => ...);
///
/// var value = await lazy.Value;
/// </code>
/// </remarks>
static partial class AsyncLazy
{
/// <summary>
/// Creates an <see cref="AsyncLazy{T}"/> using the given <paramref name="valueFactory"/>.
/// </summary>
public static AsyncLazy<T> Create<T>(Func<T> valueFactory) => new AsyncLazy<T>(valueFactory);

/// <summary>
/// Creates an <see cref="AsyncLazy{T}"/> using the given <paramref name="asyncValueFactory"/>.
/// </summary>
public static AsyncLazy<T> Create<T>(Func<Task<T>> asyncValueFactory) => new AsyncLazy<T>(asyncValueFactory);
}

/// <summary>
/// A <see cref="Lazy{T}"/> that initializes asynchronously and whose
/// <see cref="Lazy{T}.Value"/> can be awaited for initialization completion.
/// </summary>
/// <remarks>
/// Basically taken from https://devblogs.microsoft.com/pfxteam/asynclazyt/.
/// Usage:
/// <code>
/// var lazy = new AsyncLazy&lt;T&gt;(() => ...);
///
/// var value = await lazy.Value;
/// </code>
/// </remarks>
/// <typeparam name="T">The type of async lazily-initialized value.</typeparam>
partial class AsyncLazy<T> : Lazy<Task<T>>
{
/// <summary>
/// Initializes the lazy, using <see cref="Task.Run(Func{T})"/> to asynchronously
/// schedule the value factory execution.
/// </summary>
public AsyncLazy(Func<T> valueFactory) : base(() => Task.Run(valueFactory))
{ }

/// <summary>
/// Initializes the lazy, using <see cref="Task.Run(Func{Task{T}})"/> to asynchronously
/// schedule the value factory execution.
/// </summary>
public AsyncLazy(Func<Task<T>> asyncValueFactory) : base(() => Task.Run(() => asyncValueFactory()))
{ }

/// <summary>
/// Allows awaiting the async lazy directly.
/// </summary>
public TaskAwaiter<T> GetAwaiter() => Value.GetAwaiter();

/// <summary>
/// Gets a value indicating whether the value factory has been invoked and has run to completion.
/// </summary>
public bool IsValueFactoryCompleted => base.Value.IsCompleted;
}
}
97 changes: 97 additions & 0 deletions src/WhatsApp/Extensions/Base62.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// <auto-generated />
#region License
// MIT License
//
// Copyright (c) Daniel Cazzulino
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
#endregion

#nullable enable
using System.Linq;
using System.Numerics;
using System.Text;

namespace System
{
/// <summary>
/// Inspired in a bunch of searches, samples and snippets on various languages
/// and blogs and what-not on doing URL shortering :), heavily tweaked to make
/// it fully idiomatic in C#.
/// </summary>
static partial class Base62
{
/// <summary>
/// Encodes a Guid into a base62 string.
/// </summary>
public static string ToBase62(this Guid guid)
=> Encode(new BigInteger(guid.ToByteArray()));

/// <summary>
/// Encodes the absolute numeric value into a base62 string.
/// </summary>
public static string Encode(BigInteger value)
{
if (value == 0)
return "0";

// normalize sign in case we got negative.
value = BigInteger.Abs(value);

// TODO: I'm almost sure there's a more succint way
// of doing this with LINQ and Aggregate, but just
// can't figure it out...
var sb = new StringBuilder();

while (value != 0)
{
sb = sb.Append(ToBase62(value % 62));
value /= 62;
}

// Reverse the string, since we're building it backwards,
var chars = sb.ToString().ToCharArray();
Array.Reverse(chars);

return new string(chars);
}

/// <summary>
/// Decodes a base62 string into its original numeric value.
/// </summary>
public static BigInteger Decode(string value)
=> value.Aggregate(new BigInteger(0), (current, c) => current * 62 + FromBase62(c));

static char ToBase62(BigInteger d) => d switch
{
BigInteger v when v < 10 => (char)('0' + d),
BigInteger v when v < 36 => (char)('A' + d - 10),
BigInteger v when v < 62 => (char)('a' + d - 36),
_ => throw new ArgumentException($"Cannot encode digit {d} to base 62.", nameof(d)),
};

static BigInteger FromBase62(char c) => c switch
{
char v when c >= 'a' && v <= 'z' => 36 + c - 'a',
char v when c >= 'A' && v <= 'Z' => 10 + c - 'A',
char v when c >= '0' && v <= '9' => c - '0',
_ => throw new ArgumentException($"Cannot decode char '{c}' from base 62.", nameof(c)),
};
}
}
70 changes: 70 additions & 0 deletions src/WhatsApp/Idempotency.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
using System.Numerics;
using System.Security.Cryptography;
using System.Text;
using Azure;
using Azure.Data.Tables;
using Microsoft.Extensions.Caching.Hybrid;

namespace Devlooped.WhatsApp;

static class IdempotencyExtensions
{
public static ValueTask<bool> IsProcessedAsync(this Idempotency idempotency, Message message, string json, CancellationToken token = default)
=> idempotency.IsProcessedAsync(message.User.Number, RowKey(message, json), token);

public static async ValueTask<ETag?> TrySetProcessedAsync(this Idempotency idempotency, Message message, string json, CancellationToken token = default)
=> await idempotency.TrySetProcessedAsync(message.User.Number, RowKey(message, json), token);

public static async ValueTask ResetProcessedAsync(this Idempotency idempotency, Message message, string json, ETag etag, CancellationToken token = default)
=> await idempotency.ResetProcessedAsync(message.User.Number, RowKey(message, json), etag, token);

static string RowKey(Message message, string payload)
=> message.Id.StartsWith("wamid.", StringComparison.Ordinal) ? message.Id : Base62.Encode(new BigInteger(MD5.HashData(Encoding.UTF8.GetBytes(payload)), isUnsigned: true, isBigEndian: true));
}

class Idempotency(TableServiceClient client, HybridCache cache)
{
static readonly HybridCacheEntryOptions options = new()
{
LocalCacheExpiration = TimeSpan.FromDays(3),
Expiration = TimeSpan.FromDays(30)
};

readonly AsyncLazy<TableClient> table = new(async () =>
{
var table = client.GetTableClient("WhatsAppWebhook");
await table.CreateIfNotExistsAsync();
return table;
});

public ValueTask<bool> IsProcessedAsync(string partitionKey, string rowKey, CancellationToken token = default)
=> cache.GetOrCreateAsync(Key(partitionKey, rowKey),
async key => await (await table).GetEntityIfExistsAsync<TableEntity>(partitionKey, rowKey, cancellationToken: token) is { HasValue: true },
options, cancellationToken: token);

public async ValueTask<ETag?> TrySetProcessedAsync(string partitionKey, string rowKey, CancellationToken token = default)
{
var key = Key(partitionKey, rowKey);
try
{
var entity = await (await table).AddEntityAsync(new TableEntity(partitionKey, rowKey), token);
await cache.SetAsync(key, true, options, cancellationToken: token);
return entity.Headers.ETag ?? ETag.All;
}
catch (RequestFailedException ex) when (ex.Status == 409)
{
await cache.SetAsync(key, true, options, cancellationToken: token);
return null;
}
}

public async ValueTask ResetProcessedAsync(string partitionKey, string rowKey, ETag etag, CancellationToken token = default)
{
// If actual processing of a previously marked item failed, we want to return its unprocessed state
var key = Key(partitionKey, rowKey);
await (await table).DeleteEntityAsync(partitionKey, rowKey, etag, token);
await cache.RemoveAsync(key, token);
}

static string Key(string partitionKey, string rowKey) => $"wa:dup:{partitionKey}/{rowKey}";
}
Loading