Skip to content

Commit

Permalink
reproduced #3431
Browse files Browse the repository at this point in the history
  • Loading branch information
Aaronontheweb committed May 3, 2018
1 parent e3752c1 commit 1b61458
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 4 deletions.
86 changes: 86 additions & 0 deletions src/core/Akka.Persistence.Tests/Bug3431FixSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
using Akka.Persistence.Snapshot;
using Akka.TestKit;
using Xunit;
using FluentAssertions;
using Xunit.Abstractions;

namespace Akka.Persistence.Tests
{
/// <summary>
/// Specs to validate the whether or not https://github.com/akkadotnet/akka.net/issues/3431 is an issue
/// </summary>
public class Bug3431FixSpec : AkkaSpec
{
public Bug3431FixSpec(ITestOutputHelper helper) : base(helper) { }

/// <summary>
/// Aggressively saves snapshots to the <see cref="LocalSnapshotStore"/> without
/// incrementing any sequence numbers. Tries on purpose to create two conflicting snapshots
/// with the same ID at the same time.
/// </summary>
public class AggressiveSnapshotStoreActor : ReceivePersistentActor
{
private readonly IActorRef _testActor;
private int _currentCount = 0; // this is our snapshot value
public override string PersistenceId => Context.Self.Path.Name;

public AggressiveSnapshotStoreActor(IActorRef testActor)
{
_testActor = testActor;

Command<SaveSnapshotSuccess>(_ => _testActor.Tell(_));

Command<SaveSnapshotFailure>(_ => _testActor.Tell(_));

CommandAny(_ =>
{
_currentCount++;
SaveSnapshot(_currentCount);
});

Recover<SnapshotOffer>(offer =>
{

});
}

protected override void PostStop()
{
DeleteSnapshot(0); // delete our snapshot
}
}

[Fact]
public void Should_save_concurrent_Snapshots_to_LocalSnapshotStore()
{
var snapshotCount = 25;
var snapshotActor = Sys.ActorOf(Props.Create(() => new AggressiveSnapshotStoreActor(TestActor)));

for (var i = 0; i < snapshotCount; i++)
{
snapshotActor.Tell(i);
}

var msgs = ReceiveN(snapshotCount, TimeSpan.FromSeconds(10));
var allSaved = msgs.All(x => x is SaveSnapshotSuccess);
if (!allSaved)
{
var exceptions = msgs.Where(x => !(x is SaveSnapshotSuccess)).ToList();
var errorMsg = "";
foreach (var ex in exceptions)
{
errorMsg += ex.ToString();
}
false.Should().BeTrue("expected all snapshots to be saved, but found {0} that were not. Output: {1}",
exceptions.Count, errorMsg);
}
}
}
}
9 changes: 5 additions & 4 deletions src/core/Akka.Persistence/Snapshot/LocalSnapshotStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ protected virtual void Save(SnapshotMetadata metadata, object snapshot)
{
Serialize(stream, new Serialization.Snapshot(snapshot));
});

var newName = GetSnapshotFileForWrite(metadata);
if (File.Exists(newName.FullName))
{
Expand Down Expand Up @@ -217,11 +218,11 @@ protected void Serialize(Stream stream, Serialization.Snapshot snapshot)
}

/// <summary>
/// TBD
/// Creates a new temporary file to store the current snapshot.
/// </summary>
/// <param name="metadata">TBD</param>
/// <param name="p">TBD</param>
/// <returns>TBD</returns>
/// <param name="metadata">The snapshot metadata.</param>
/// <param name="p">Delegate that will process the contents of the snapshot file.</param>
/// <returns>The populated snapshot file.</returns>
protected FileInfo WithOutputStream(SnapshotMetadata metadata, Action<Stream> p)
{
var tmpFile = GetSnapshotFileForWrite(metadata, ".tmp");
Expand Down

0 comments on commit 1b61458

Please sign in to comment.