From e7145fcf38b8d990ebe686d0620224f7eaf1a0b9 Mon Sep 17 00:00:00 2001 From: Bartosz Sypytkowski Date: Sat, 27 Aug 2016 16:21:32 +0200 Subject: [PATCH] fixed all replicated data structure tests --- .../LWWDictionarySpec.cs | 2 +- .../ORDictionarySpec.cs | 2 +- .../ORMultiDictionarySpec.cs | 2 +- .../Akka.DistributedData.Tests/ORSetSpec.cs | 2 +- .../Akka.DistributedData/ORMultiDictionary.cs | 2 +- .../cluster/Akka.DistributedData/ORSet.cs | 44 ++++++++++++------- .../Akka.DistributedData/VersionVector.cs | 19 +++++++- 7 files changed, 49 insertions(+), 24 deletions(-) diff --git a/src/contrib/cluster/Akka.DistributedData.Tests/LWWDictionarySpec.cs b/src/contrib/cluster/Akka.DistributedData.Tests/LWWDictionarySpec.cs index cbb85f85498..808c5c04fd4 100644 --- a/src/contrib/cluster/Akka.DistributedData.Tests/LWWDictionarySpec.cs +++ b/src/contrib/cluster/Akka.DistributedData.Tests/LWWDictionarySpec.cs @@ -38,7 +38,7 @@ public void A_LWWDictionary_should_be_able_to_have_its_entries_correctly_merged_ { var m1 = LWWDictionary.Create( Tuple.Create(_node1, "a", 1), - Tuple.Create(_node2, "b", 2)); + Tuple.Create(_node1, "b", 2)); var m2 = LWWDictionary.Create(_node2, "c", 3); var expected = ImmutableDictionary.CreateRange(new[] diff --git a/src/contrib/cluster/Akka.DistributedData.Tests/ORDictionarySpec.cs b/src/contrib/cluster/Akka.DistributedData.Tests/ORDictionarySpec.cs index 6832e28c2bd..90129050b33 100644 --- a/src/contrib/cluster/Akka.DistributedData.Tests/ORDictionarySpec.cs +++ b/src/contrib/cluster/Akka.DistributedData.Tests/ORDictionarySpec.cs @@ -167,7 +167,7 @@ public void A_ORDictionary_should_be_able_to_update_an_entry() Assert.Equal(ImmutableHashSet.Create("B2"), merged2["b"].Elements); Assert.Equal(ImmutableHashSet.Create("C"), merged2["c"].Elements); - var m4 = merged1.AddOrUpdate(_node2, "b", ORSet.Empty, old => old.Add(_node1, "B3")); + var m4 = merged1.AddOrUpdate(_node2, "b", ORSet.Empty, old => old.Add(_node2, "B3")); var merged3 = m3.Merge(m4); Assert.Equal(ImmutableHashSet.Create("A"), merged3["a"].Elements); Assert.Equal(ImmutableHashSet.Create("B2", "B3"), merged3["b"].Elements); diff --git a/src/contrib/cluster/Akka.DistributedData.Tests/ORMultiDictionarySpec.cs b/src/contrib/cluster/Akka.DistributedData.Tests/ORMultiDictionarySpec.cs index b50b8814680..477df0fbb10 100644 --- a/src/contrib/cluster/Akka.DistributedData.Tests/ORMultiDictionarySpec.cs +++ b/src/contrib/cluster/Akka.DistributedData.Tests/ORMultiDictionarySpec.cs @@ -99,7 +99,7 @@ public void A_ORMultiDictionary_should_be_able_to_have_its_entries_correctly_mer .AddItem(_node1, "d", "D1"); var m2 = ORMultiDictionary.Empty - .AddItem(_node2, "c", "C1") + .AddItem(_node2, "c", "C2") .AddItem(_node2, "a", "A2") .AddItem(_node2, "b", "B2") .RemoveItem(_node2, "b", "B2") diff --git a/src/contrib/cluster/Akka.DistributedData.Tests/ORSetSpec.cs b/src/contrib/cluster/Akka.DistributedData.Tests/ORSetSpec.cs index f5b7182b33f..e2729980d43 100644 --- a/src/contrib/cluster/Akka.DistributedData.Tests/ORSetSpec.cs +++ b/src/contrib/cluster/Akka.DistributedData.Tests/ORSetSpec.cs @@ -179,7 +179,7 @@ public void A_ORSet_should_be_able_to_have_its_element_set_correctly_merged_for_ Assert.Contains(_user3, c1.Elements); // set 1 - var c2 = ORSet.Empty.Add(_node2, _user1).Remove(_node2, _user2).Remove(_node2, _user3); + var c2 = c1.Add(_node2, _user1).Remove(_node2, _user2).Remove(_node2, _user3); Assert.Contains(_user1, c2.Elements); Assert.DoesNotContain(_user2, c2.Elements); Assert.DoesNotContain(_user3, c2.Elements); diff --git a/src/contrib/cluster/Akka.DistributedData/ORMultiDictionary.cs b/src/contrib/cluster/Akka.DistributedData/ORMultiDictionary.cs index a673571995a..8c834197098 100644 --- a/src/contrib/cluster/Akka.DistributedData/ORMultiDictionary.cs +++ b/src/contrib/cluster/Akka.DistributedData/ORMultiDictionary.cs @@ -96,7 +96,7 @@ public ORMultiDictionary RemoveItem(UniqueAddress node, TKey key, { var newUnderlying = _underlying.AddOrUpdate(node, key, ORSet.Empty, set => set.Remove(node, element)); ORSet found; - if (_underlying.TryGetValue(key, out found) && found.IsEmpty) + if (newUnderlying.TryGetValue(key, out found) && found.IsEmpty) { newUnderlying = newUnderlying.Remove(node, key); } diff --git a/src/contrib/cluster/Akka.DistributedData/ORSet.cs b/src/contrib/cluster/Akka.DistributedData/ORSet.cs index 207ff3cecdd..ae24da80fad 100644 --- a/src/contrib/cluster/Akka.DistributedData/ORSet.cs +++ b/src/contrib/cluster/Akka.DistributedData/ORSet.cs @@ -79,30 +79,40 @@ public class ORSet : FastMerge>, IORSet, IReplicatedDataSerializa private readonly IImmutableDictionary _elementsMap; private readonly VersionVector _versionVector; - private static VersionVector SubtractDots(VersionVector dot, VersionVector vector) + /// + /// INTERNAL API + /// Subtract the from the . + /// What this means is that any (node, version) pair in + /// that is <= an entry in is removed from . + /// Example [{a, 3}, {b, 2}, {d, 14}, {g, 22}] - + /// [{a, 4}, {b, 1}, {c, 1}, {d, 14}, {e, 5}, {f, 2}] = + /// [{b, 2}, {g, 22}] + /// + private static VersionVector SubtractDots(VersionVector dot, VersionVector vvector) { if (dot.IsEmpty) return VersionVector.Empty; - else + + if (dot is SingleVersionVector) + { + // if dot is dominated by version vector, drop it + var single = (SingleVersionVector)dot; + return vvector.VersionAt(single.Node) >= single.Version ? VersionVector.Empty : dot; + } + + if (dot is MultiVersionVector) { - if (dot is SingleVersionVector) + var multi = (MultiVersionVector)dot; + var acc = ImmutableDictionary.Empty.ToBuilder(); + foreach (var pair in multi.Versions) { - var single = (SingleVersionVector)dot; - return vector.VersionAt(single.Node) >= single.Version ? VersionVector.Empty : dot; + var v2 = vvector.VersionAt(pair.Key); + if (v2 < pair.Value) acc.Add(pair); } - else if (dot is MultiVersionVector) - { - var multi = (MultiVersionVector)dot; - var acc = ImmutableDictionary.Empty.ToBuilder(); - foreach (var pair in multi.Versions) - { - var v2 = vector.VersionAt(pair.Key); - if (v2 < pair.Value) acc.Add(pair); - } - return new MultiVersionVector(acc); - } - else throw new NotSupportedException("Cannot subtract dots from provided version vector"); + return VersionVector.Create(acc.ToImmutable()); } + + throw new NotSupportedException("Cannot subtract dots from provided version vector"); } private static IImmutableDictionary MergeCommonKeys(IEnumerable commonKeys, ORSet lhs, ORSet rhs) => commonKeys.Aggregate(ImmutableDictionary.Empty, (acc, k) => diff --git a/src/contrib/cluster/Akka.DistributedData/VersionVector.cs b/src/contrib/cluster/Akka.DistributedData/VersionVector.cs index 64c9f51bac8..9152816e12d 100644 --- a/src/contrib/cluster/Akka.DistributedData/VersionVector.cs +++ b/src/contrib/cluster/Akka.DistributedData/VersionVector.cs @@ -9,6 +9,7 @@ using System.Collections; using System.Collections.Generic; using System.Collections.Immutable; +using System.Linq; using Akka.Cluster; using Akka.DistributedData.Proto; using Akka.Util.Internal; @@ -34,8 +35,17 @@ public enum Ordering } public static VersionVector Create(UniqueAddress node, long version) => new SingleVersionVector(node, version); - public static VersionVector Create(IEnumerable> versions) => - new MultiVersionVector(versions); + + public static VersionVector Create(ImmutableDictionary versions) + { + if (versions.IsEmpty) return VersionVector.Empty; + if (versions.Count == 1) + { + var v = versions.First(); + return new SingleVersionVector(v.Key, v.Value); + } + return new MultiVersionVector(versions); + } protected static readonly AtomicCounterLong Counter = new AtomicCounterLong(1L); @@ -282,6 +292,8 @@ public override VersionVector Prune(UniqueAddress removedNode, UniqueAddress col public override VersionVector PruningCleanup(UniqueAddress removedNode) => Node == removedNode ? Empty : this; + + public override string ToString() => $"VersionVector({Node}->{Version})"; } [Serializable] @@ -346,5 +358,8 @@ public override VersionVector Prune(UniqueAddress removedNode, UniqueAddress col public override VersionVector PruningCleanup(UniqueAddress removedNode) => new MultiVersionVector(Versions.Remove(removedNode)); + + public override string ToString() => + $"VersionVector({string.Join(";", Versions.Select(kv => $"({kv.Key}->{kv.Value})"))})"; } } \ No newline at end of file