Skip to content

Commit

Permalink
Bugfix #4400 LWWDictionary Delta serialization (#4401)
Browse files Browse the repository at this point in the history
* Possible fix

* Attempt to reproduce bug (failed)

* Fix #4302 bug spec

* Wrong bug repro code, removed.

* Bug happened here

* Bug spec for #4367

* Add spec comment to link to the github issue page

* simplified reproduction spec for #4367

* added another repro

* added serialization error reproduction spec

* fixed main issue with serializer

* Revert "fixed main issue with serializer"

This reverts commit 9c01ddb.

* fixed serializer by deferring casts until they're actually necessary

* fixed delta merging spec

* remove debug comment

* Add #4400 bug reproduction spec (similar serialization problem)

* Rename spec unit test name

* Bug fix for #4400

* Fix spec

* Revert "Fix spec"

This reverts commit 324da7a.

* Fix racy spec

Co-authored-by: Aaron Stannard <aaron@petabridge.com>
  • Loading branch information
Arkatufus and Aaronontheweb authored May 4, 2020
1 parent c933863 commit a804b05
Show file tree
Hide file tree
Showing 4 changed files with 241 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
//-----------------------------------------------------------------------

using System;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Collections.Immutable;
using Akka.Actor;
using Akka.Cluster;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;
using Akka.DistributedData.Internal;

namespace Akka.DistributedData.Tests
{
Expand Down Expand Up @@ -125,5 +127,39 @@ public void LWWDictionary_must_be_able_to_work_with_deltas()
{"c", 3}
});
}

/// <summary>
/// Bug reproduction: https://github.com/akkadotnet/akka.net/issues/4400
/// </summary>
[Fact]
public async Task Bugfix_4400_LWWDictionary_Deltas_must_merge_other_LWWDictionary()
{
var m1 = LWWDictionary<string, string>.Empty
.SetItem(_node1, "a", "A")
.SetItem(_node1, "b", "B1");

await Task.Delay(200);

var m2 = LWWDictionary<string, string>.Empty
.SetItem(_node2, "c", "C")
.SetItem(_node2, "b", "B2");

// This is how deltas really get merged inside the replicator
var dataEnvelope = new DataEnvelope(m1.Delta);
if (dataEnvelope.Data is IReplicatedDelta withDelta)
{
dataEnvelope = dataEnvelope.WithData(withDelta.Zero.MergeDelta(withDelta));
}

// Bug: this is was an ORDictionary<string, ORSet<string>> under #4302
var storedData = dataEnvelope.Data;

// simulate merging an update
var merged1 = (LWWDictionary<string, string>)m2.Merge(storedData);

merged1.Entries["a"].Should().BeEquivalentTo("A");
merged1.Entries["b"].Should().BeEquivalentTo("B2");
merged1.Entries["c"].Should().BeEquivalentTo("C");
}
}
}
93 changes: 93 additions & 0 deletions src/contrib/cluster/Akka.DistributedData.Tests/ReplicatorSpecs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ static ReplicatorSpecs()
private readonly ORDictionaryKey<string, Flag> _keyH = new ORDictionaryKey<string, Flag>("H");
private readonly GSetKey<string> _keyI = new GSetKey<string>("I");
private readonly ORMultiValueDictionaryKey<string, string> _keyJ = new ORMultiValueDictionaryKey<string, string>("J");
private readonly LWWDictionaryKey<string, string> _keyK = new LWWDictionaryKey<string, string>("K");

public ReplicatorSpecs(ITestOutputHelper helper) : base(SpecConfig, helper)
{
Expand Down Expand Up @@ -256,6 +257,98 @@ private async Task PNCounterDictionary_Should_Merge()
Sys.Log.Info("Done");
}

/// <summary>
/// Reproduction spec for https://github.com/akkadotnet/akka.net/issues/4400
/// </summary>
[Fact]
public async Task Bugfix_4400_LWWDictionary_Merge()
{
await InitCluster();

var changedProbe = CreateTestProbe(_sys2);

// subscribe to updates for KeyJ, then
_replicator2.Tell(Dsl.Subscribe(_keyK, changedProbe.Ref));

Within(TimeSpan.FromSeconds(2), () =>
{
AwaitAssert(() =>
{
// update it with a replication factor of two
_replicator2.Tell(Dsl.Update(
_keyK,
LWWDictionary<string, string>.Empty,
WriteLocal.Instance,
x => x.SetItem(Cluster.Cluster.Get(_sys2), "a", "A")));

// receive local update
var entries = changedProbe.ExpectMsg<Changed>(g => Equals(g.Key, _keyK)).Get(_keyK).Entries;
entries.ShouldAllBeEquivalentTo(new Dictionary<string, string> {
{"a", "A" }
});
});
});

Within(TimeSpan.FromSeconds(2), () =>
{
AwaitAssert(() =>
{
// push update from node 1
// add item
_replicator1.Tell(Dsl.Update(
_keyK,
LWWDictionary<string, string>.Empty,
WriteLocal.Instance,
x => x.SetItem(Cluster.Cluster.Get(_sys1), "a", "A1")));

var entries = changedProbe.ExpectMsg<Changed>(g => Equals(g.Key, _keyK)).Get(_keyK).Entries;
// expect replication of update on node 2
entries.ShouldAllBeEquivalentTo(new Dictionary<string, string> {
{"a", "A1" }
});
});
});

Within(TimeSpan.FromSeconds(2), () =>
{
AwaitAssert(() =>
{
// remove item
_replicator1.Tell(Dsl.Update(
_keyK,
LWWDictionary<string, string>.Empty,
WriteLocal.Instance,
x => x.Remove(Cluster.Cluster.Get(_sys1), "a")));

var entries = changedProbe.ExpectMsg<Changed>(g => Equals(g.Key, _keyK)).Get(_keyK).Entries;
// expect replication of remove on node 2
entries.ShouldAllBeEquivalentTo(new Dictionary<string, string> ());
});
});

Within(TimeSpan.FromSeconds(2), () =>
{
AwaitAssert(() =>
{
// send multiple updates
_replicator1.Tell(Dsl.Update(
_keyK,
LWWDictionary<string, string>.Empty,
WriteLocal.Instance,
x => x
.SetItem(Cluster.Cluster.Get(_sys1), "a", "A")
.SetItem(Cluster.Cluster.Get(_sys1), "b", "B")));

var entries = changedProbe.ExpectMsg<Changed>(g => Equals(g.Key, _keyK)).Get(_keyK).Entries;
// expect replication of remove on node 2
entries.ShouldAllBeEquivalentTo(new Dictionary<string, string> {
{ "a", "A" },
{ "b", "B" },
});
});
});
}

/// <summary>
/// Reproduction spec for https://github.com/akkadotnet/akka.net/issues/4198
/// </summary>
Expand Down
90 changes: 87 additions & 3 deletions src/contrib/cluster/Akka.DistributedData/LWWDictionary.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@ internal interface ILWWDictionaryKey
Type ValueType { get; }
}

/// <summary>
/// INTERNAL API.
///
/// For serialization purposes.
/// </summary>
internal interface ILWWDictionaryDeltaOperation
{
ORDictionary.IDeltaOperation Underlying { get; }
}

/// <summary>
/// Typed key used to store <see cref="LWWDictionary{TKey,TValue}"/> replica
/// inside current <see cref="Replicator"/> key-value store.
Expand Down Expand Up @@ -329,7 +339,74 @@ public override string ToString()
return sb.ToString();
}

public ORDictionary<TKey, LWWRegister<TValue>>.IDeltaOperation Delta => Underlying.Delta;
#region delta

internal sealed class LWWDictionaryDelta : ORDictionary<TKey, LWWRegister<TValue>>.IDeltaOperation, IReplicatedDeltaSize, ILWWDictionaryDeltaOperation
{
internal readonly ORDictionary<TKey, LWWRegister<TValue>>.IDeltaOperation Underlying;

public LWWDictionaryDelta(ORDictionary<TKey, LWWRegister<TValue>>.IDeltaOperation underlying)
{
Underlying = underlying;
if (underlying is IReplicatedDeltaSize s)
{
DeltaSize = s.DeltaSize;
}
else
{
DeltaSize = 1;
}
}

public IReplicatedData Merge(IReplicatedData other)
{
if (other is LWWDictionaryDelta d)
{
return new LWWDictionaryDelta((ORDictionary<TKey, LWWRegister<TValue>>.IDeltaOperation)Underlying.Merge(d.Underlying));
}

return new LWWDictionaryDelta((ORDictionary<TKey, LWWRegister<TValue>>.IDeltaOperation)Underlying.Merge(other));
}

public IDeltaReplicatedData Zero => LWWDictionary<TKey, TValue>.Empty;

public override bool Equals(object obj)
{
return obj is LWWDictionary<TKey, TValue>.LWWDictionaryDelta operation &&
Equals(operation.Underlying);
}

public bool Equals(ORDictionary<TKey, LWWRegister<TValue>>.IDeltaOperation other)
{
if (other is ORDictionary<TKey, LWWRegister<TValue>>.DeltaGroup group)
{
if (Underlying is ORDictionary<TKey, LWWRegister<TValue>>.DeltaGroup ourGroup)
{
return ourGroup.Operations.SequenceEqual(group.Operations);
}

if (group.Operations.Length == 1)
{
return Underlying.Equals(group.Operations.First());
}

return false;
}
return Underlying.Equals(other);
}

public override int GetHashCode()
{
return Underlying.GetHashCode();
}

public int DeltaSize { get; }
ORDictionary.IDeltaOperation ILWWDictionaryDeltaOperation.Underlying => (ORDictionary.IDeltaOperation)Underlying;
}

// TODO: optimize this so it doesn't allocate each time it's called
public ORDictionary<TKey, LWWRegister<TValue>>.IDeltaOperation Delta =>
new LWWDictionaryDelta(Underlying.Delta);

IReplicatedDelta IDeltaReplicatedData.Delta => Delta;

Expand All @@ -338,12 +415,19 @@ IReplicatedData IDeltaReplicatedData.MergeDelta(IReplicatedDelta delta) =>

IReplicatedData IDeltaReplicatedData.ResetDelta() => ResetDelta();

public LWWDictionary<TKey, TValue> MergeDelta(ORDictionary<TKey, LWWRegister<TValue>>.IDeltaOperation delta) =>
new LWWDictionary<TKey, TValue>(Underlying.MergeDelta(delta));
public LWWDictionary<TKey, TValue> MergeDelta(ORDictionary<TKey, LWWRegister<TValue>>.IDeltaOperation delta)
{
if (delta is LWWDictionaryDelta lwwd)
delta = lwwd.Underlying;

return new LWWDictionary<TKey, TValue>(Underlying.MergeDelta(delta));
}

public LWWDictionary<TKey, TValue> ResetDelta() =>
new LWWDictionary<TKey, TValue>(Underlying.ResetDelta());

#endregion

public Type KeyType => typeof(TKey);
public Type ValueType => typeof(TValue);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public sealed class ReplicatedDataSerializer : SerializerWithStringManifest
private const string ORMapUpdateManifest = "Hu";
private const string ORMapDeltaGroupManifest = "Hg";
private const string LWWMapManifest = "I";
private const string LWWMapDeltaGroupManifest = "Ig";
private const string LWWMapKeyManifest = "i";
private const string PNCounterMapManifest = "J";
private const string PNCounterMapDeltaOperationManifest = "Jo";
Expand Down Expand Up @@ -86,6 +87,7 @@ public override byte[] ToBinary(object obj)
case IORDictionary o: return SerializationSupport.Compress(ToProto(o));
case ORDictionary.IDeltaOperation p: return ToProto(p).ToByteArray();
case ILWWDictionary l: return SerializationSupport.Compress(ToProto(l));
case ILWWDictionaryDeltaOperation ld: return ToProto(ld.Underlying).ToByteArray();
case IPNCounterDictionary pn: return SerializationSupport.Compress(ToProto(pn));
case IPNCounterDictionaryDeltaOperation pnd: return ToProto(pnd.Underlying).ToByteArray();
case IORMultiValueDictionary m: return SerializationSupport.Compress(ToProto(m));
Expand Down Expand Up @@ -121,6 +123,8 @@ public override object FromBinary(byte[] bytes, string manifest)
case ORMapUpdateManifest: return ORDictionaryUpdateFromBinary(bytes);
case ORMapDeltaGroupManifest: return ORDictionaryDeltaGroupFromBinary(bytes);
case LWWMapManifest: return LWWDictionaryFromBinary(SerializationSupport.Decompress(bytes));
case LWWMapDeltaGroupManifest:
return LWWDictionaryDeltaGroupFromBinary(bytes);
case PNCounterMapManifest: return PNCounterDictionaryFromBinary(SerializationSupport.Decompress(bytes));
case PNCounterMapDeltaOperationManifest: return PNCounterDeltaFromBinary(bytes);
case ORMultiMapManifest: return ORMultiDictionaryFromBinary(SerializationSupport.Decompress(bytes));
Expand Down Expand Up @@ -166,6 +170,7 @@ public override string Manifest(object o)
case ORDictionary.IRemoveKeyDeltaOp _: return ORMapRemoveKeyManifest;
case ORDictionary.IUpdateDeltaOp _: return ORMapUpdateManifest;
case ILWWDictionary _: return LWWMapManifest;
case ILWWDictionaryDeltaOperation _: return LWWMapDeltaGroupManifest;
case IPNCounterDictionary _: return PNCounterMapManifest;
case IPNCounterDictionaryDeltaOperation _: return PNCounterMapDeltaOperationManifest;
case IORMultiValueDictionary _: return ORMultiMapManifest;
Expand Down Expand Up @@ -1191,6 +1196,26 @@ private ILWWDictionary LWWDictionaryFromBinary(byte[] bytes)
return LWWDictFromProto(proto);
}


private object LWWDictionaryDeltaGroupFromBinary(byte[] bytes)
{
var proto = Proto.Msg.ORMapDeltaGroup.Parser.ParseFrom(bytes);
var orDictOp = ORDictionaryDeltaGroupFromProto(proto);

var orSetType = orDictOp.ValueType.GenericTypeArguments[0];
var maker = LWWDictionaryDeltaMaker.MakeGenericMethod(orDictOp.KeyType, orSetType);
return (ILWWDictionaryDeltaOperation)maker.Invoke(this, new object[] { orDictOp });
}

private static readonly MethodInfo LWWDictionaryDeltaMaker =
typeof(ReplicatedDataSerializer).GetMethod(nameof(LWWDictionaryDeltaFromProto), BindingFlags.Instance | BindingFlags.NonPublic);

private ILWWDictionaryDeltaOperation LWWDictionaryDeltaFromProto<TKey, TValue>(ORDictionary.IDeltaOperation op)
{
var casted = (ORDictionary<TKey, LWWRegister<TValue>>.IDeltaOperation)op;
return new LWWDictionary<TKey, TValue>.LWWDictionaryDelta(casted);
}

#endregion

#region PNCounterDictionary
Expand Down

0 comments on commit a804b05

Please sign in to comment.