Skip to content

Commit

Permalink
added initial Akka.Persistence.Hosting implementation for Azure (#223)
Browse files Browse the repository at this point in the history
* added initial Akka.Persistence.Hosting implementation for Azure

close #222

* fixed build issues

* added sanity check for Akka.Persistence.Azure.Hosting

* close #224

fixed malformed log message

* fixed Akka.Hosting test setup
  • Loading branch information
Aaronontheweb authored Jul 21, 2022
1 parent 3111200 commit f3344af
Show file tree
Hide file tree
Showing 7 changed files with 264 additions and 6 deletions.
10 changes: 10 additions & 0 deletions Akka.Persistence.Azure.sln
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "build", "build", "{320BFA6C
build.sh = build.sh
EndProjectSection
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Persistence.Azure.Hosting", "src\Akka.Persistence.Azure.Hosting\Akka.Persistence.Azure.Hosting.csproj", "{64C6B877-9262-456B-8A1C-60C4F272DA19}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand All @@ -35,6 +37,14 @@ Global
{CAE7CA7C-0D0C-4FDA-BDE9-BE16A27343EF}.Debug|Any CPU.Build.0 = Debug|Any CPU
{CAE7CA7C-0D0C-4FDA-BDE9-BE16A27343EF}.Release|Any CPU.ActiveCfg = Release|Any CPU
{CAE7CA7C-0D0C-4FDA-BDE9-BE16A27343EF}.Release|Any CPU.Build.0 = Release|Any CPU
{FE4C3232-1DB9-40E5-B0BE-BD70ACB4C1F3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{FE4C3232-1DB9-40E5-B0BE-BD70ACB4C1F3}.Debug|Any CPU.Build.0 = Debug|Any CPU
{FE4C3232-1DB9-40E5-B0BE-BD70ACB4C1F3}.Release|Any CPU.ActiveCfg = Release|Any CPU
{FE4C3232-1DB9-40E5-B0BE-BD70ACB4C1F3}.Release|Any CPU.Build.0 = Release|Any CPU
{64C6B877-9262-456B-8A1C-60C4F272DA19}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{64C6B877-9262-456B-8A1C-60C4F272DA19}.Debug|Any CPU.Build.0 = Debug|Any CPU
{64C6B877-9262-456B-8A1C-60C4F272DA19}.Release|Any CPU.ActiveCfg = Release|Any CPU
{64C6B877-9262-456B-8A1C-60C4F272DA19}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<Project Sdk="Microsoft.NET.Sdk">
<Import Project="..\common.props" />
<PropertyGroup>
<TargetFramework>$(NetStandardLibVersion)</TargetFramework>
<Description>Akka.Hosting support for Akka.Persistence.Azure.</Description>
<LangVersion>8.0</LangVersion>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\Akka.Persistence.Azure\Akka.Persistence.Azure.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Akka.Persistence.Hosting" Version="0.4.0" />
</ItemGroup>

</Project>
102 changes: 102 additions & 0 deletions src/Akka.Persistence.Azure.Hosting/AzurePersistenceExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
using System;
using Akka.Actor;
using Akka.Configuration;
using Akka.Hosting;
using Akka.Persistence.Azure.Query;
using Akka.Persistence.Hosting;

namespace Akka.Persistence.Azure.Hosting
{
/// <summary>
/// Extension methods for Akka.Hosting and Akka.Azure.Persistence
/// </summary>
public static class AzurePersistenceExtensions
{
public const string DefaultTableName = "AkkaPersistenceDefaultTable";
public const string DefaultBlobContainerName = "akka-persistence-default-container";

private static string ToHocon(bool b)
{
return b ? "on" : "off";
}


public static AkkaConfigurationBuilder WithAzureTableJournal(this AkkaConfigurationBuilder builder,
string connectionString, bool autoInitialize = true, string tableName = DefaultTableName, Action<AkkaPersistenceJournalBuilder> configurator = null)
{
Config journalConfiguration = @$"
akka.persistence {{
journal {{
plugin = ""akka.persistence.journal.azure-table""
azure-table {{
class = ""Akka.Persistence.Azure.Journal.AzureTableStorageJournal, Akka.Persistence.Azure""
connection-string = ""{connectionString}""
# the name of the Windows Azure Table used to persist journal events
table-name = ""{tableName}""
auto-initialize = {ToHocon(autoInitialize)}
}}
}}
}}";

var finalConfig = journalConfiguration;
builder.AddHocon(finalConfig, HoconAddMode.Prepend);

// PUSH DEFAULT CONFIG TO END
builder.AddHocon(AzurePersistence.DefaultConfig, HoconAddMode.Append);

if (configurator != null) // configure event adapters
{
builder.WithJournal("azure-table", configurator);
}

return builder;
}

public static AkkaConfigurationBuilder WithAzureBlobsSnapshotStore(this AkkaConfigurationBuilder builder,
string connectionString, bool autoInitialize = true, string containerName = DefaultBlobContainerName)
{
Config journalConfiguration = @$"
akka.persistence {{
snapshot-store {{
plugin = ""akka.persistence.snapshot-store.azure-blob-store""
azure-blob-store {{
class = ""Akka.Persistence.Azure.Snapshot.AzureBlobSnapshotStore, Akka.Persistence.Azure""
connection-string = ""{connectionString}""
# the name of the Windows Azure Table used to persist journal events
container-name = ""{containerName}""
auto-initialize = {ToHocon(autoInitialize)}
}}
}}
}}";

var finalConfig = journalConfiguration;
builder.AddHocon(finalConfig, HoconAddMode.Prepend);

// PUSH DEFAULT CONFIG TO END
builder.AddHocon(AzurePersistence.DefaultConfig, HoconAddMode.Append);

return builder;
}

/// <summary>
/// Adds both AzureTableStorage journal and AzureBlobStorage snapshot-store as the default Akka.Persistence
/// implementations for a given <see cref="ActorSystem"/>.
/// </summary>
/// <param name="builder"></param>
/// <param name="connectionString"></param>
/// <param name="autoInitialize"></param>
/// <param name="containerName"></param>
/// <param name="tableName"></param>
/// <param name="configurator"></param>
/// <returns></returns>
public static AkkaConfigurationBuilder WithAzurePersistence(this AkkaConfigurationBuilder builder,
string connectionString, bool autoInitialize = true, string containerName = DefaultBlobContainerName,
string tableName = DefaultTableName, Action<AkkaPersistenceJournalBuilder> configurator = null)
{
builder.WithAzureTableJournal(connectionString, autoInitialize, tableName, configurator);
builder.WithAzureBlobsSnapshotStore(connectionString, autoInitialize, containerName);

return builder;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Akka.Persistence.Azure.Hosting\Akka.Persistence.Azure.Hosting.csproj" />
<ProjectReference Include="..\Akka.Persistence.Azure\Akka.Persistence.Azure.csproj" />
</ItemGroup>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Event;
using Akka.Hosting;
using Akka.Persistence.Azure.Hosting;
using Akka.TestKit.Xunit2.Internals;
using FluentAssertions;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Persistence.Azure.Tests.Hosting
{
public class AzurePersistenceHostingSanityCheck
{
public static async Task<IHost> StartHost(Action<AkkaConfigurationBuilder> testSetup)
{
var conn = Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR");
var host = new HostBuilder()
.ConfigureServices(collection =>
{
collection.AddAkka("MyActorSys", builder =>
{
builder.WithAzurePersistence(conn);
testSetup(builder);
});
}).Build();

await host.StartAsync();
return host;
}

public sealed class MyPersistenceActor : ReceivePersistentActor
{
private List<int> _values = new List<int>();

public MyPersistenceActor(string persistenceId)
{
PersistenceId = persistenceId;

Recover<SnapshotOffer>(offer =>
{
if (offer.Snapshot is IEnumerable<int> ints)
{
_values = new List<int>(ints);
}
});

Recover<int>(i => { _values.Add(i); });

Command<int>(i =>
{
Persist(i, i1 =>
{
_values.Add(i);
if (LastSequenceNr % 2 == 0)
{
SaveSnapshot(_values);
}
Sender.Tell("ACK");
});
});

Command<string>(str => str.Equals("getall"), s => { Sender.Tell(_values.ToArray()); });

Command<SaveSnapshotSuccess>(s => { });
}

public override string PersistenceId { get; }
}

private readonly ITestOutputHelper _output;

public AzurePersistenceHostingSanityCheck(ITestOutputHelper output)
{
_output = output;
}

[Fact]
public async Task ShouldLaunchAzurePersistence()
{
// arrange
using var host = await StartHost(builder => {
builder.StartActors((system, registry) =>
{
var myActor = system.ActorOf(Props.Create(() => new MyPersistenceActor("ac1")), "actor1");
registry.Register<MyPersistenceActor>(myActor);
})
.WithActors((system, registry) =>
{
var extSystem = (ExtendedActorSystem)system;
var logger = extSystem.SystemActorOf(Props.Create(() => new TestOutputLogger(_output)), "log-test");
logger.Tell(new InitializeLogger(system.EventStream));
});
});

var actorSystem = host.Services.GetRequiredService<ActorSystem>();
var actorRegistry = host.Services.GetRequiredService<ActorRegistry>();
var myPersistentActor = actorRegistry.Get<MyPersistenceActor>();

// act
var resp1 = await myPersistentActor.Ask<string>(1, TimeSpan.FromSeconds(3));
var resp2 = await myPersistentActor.Ask<string>(2, TimeSpan.FromSeconds(3));
var snapshot = await myPersistentActor.Ask<int[]>("getall", TimeSpan.FromSeconds(3));

// assert
snapshot.Should().BeEquivalentTo(new[] {1, 2});

// kill + recreate actor with same PersistentId
await myPersistentActor.GracefulStop(TimeSpan.FromSeconds(3));
var myPersistentActor2 = actorSystem.ActorOf(Props.Create(() => new MyPersistenceActor("ac1")), "actor1a");

var snapshot2 = await myPersistentActor2.Ask<int[]>("getall", TimeSpan.FromSeconds(3));
snapshot2.Should().BeEquivalentTo(new[] {1, 2});

// validate configs
var config = actorSystem.Settings.Config;
config.GetString("akka.persistence.journal.plugin").Should().Be("akka.persistence.journal.azure-table");
config.GetString("akka.persistence.snapshot-store.plugin").Should().Be("akka.persistence.snapshot-store.azure-blob-store");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ protected override async Task<IImmutableList<Exception>> WriteMessagesAsync(IEnu

if (_log.IsDebugEnabled && _settings.VerboseLogging)
foreach (var r in allPersistenceResponse.Value)
_log.Debug("Azure table storage wrote entity with status code [{1}]", r.Status);
_log.Debug("Azure table storage wrote entity with status code [{0}]", r.Status);

if (HasPersistenceIdSubscribers || HasAllPersistenceIdSubscribers)
{
Expand All @@ -391,7 +391,7 @@ protected override async Task<IImmutableList<Exception>> WriteMessagesAsync(IEnu

if (_log.IsDebugEnabled && _settings.VerboseLogging)
foreach (var r in eventTagsResponse.Value)
_log.Debug("Azure table storage wrote entity with status code [{1}]", r.Status);
_log.Debug("Azure table storage wrote entity with status code [{0}]", r.Status);

if (HasTagSubscribers && taggedEntries.Count != 0)
{
Expand Down
9 changes: 5 additions & 4 deletions src/common.props
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
<PropertyGroup>
<Copyright>Copyright © 2017-2020 Petabridge</Copyright>
<Authors>Petabridge</Authors>
<VersionPrefix>0.7.1</VersionPrefix>
<PackageReleaseNotes>Release of Akka.Persistence.Azure**
- Upgraded to [Akka.NET v1.4.12](https://github.com/akkadotnet/akka.net/releases/tag/1.4.12)
- Added backoff/retry mechanism for `AzureSnapshotStore`</PackageReleaseNotes>
<VersionPrefix>0.8.4</VersionPrefix>
<PackageReleaseNotes>Upgraded to [Akka.NET 1.4.39](https://github.com/akkadotnet/akka.net/releases/tag/1.4.39)
[Update Azure.Identity to 1.6.0](https://github.com/petabridge/Akka.Persistence.Azure/pull/205)
[Update System.Linq.Async to 6.0.1](https://github.com/petabridge/Akka.Persistence.Azure/pull/198)
[Upgrade `Microsoft.Azure.Consmos.Table` to `Azure.Data.Tables` 12.5.0](https://github.com/petabridge/Akka.Persistence.Azure/pull/207)</PackageReleaseNotes>
<PackageIconUrl>
</PackageIconUrl>
<PackageProjectUrl>
Expand Down

0 comments on commit f3344af

Please sign in to comment.