Skip to content

Commit

Permalink
fixed all replicated data structure tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Horusiath committed Aug 27, 2016
1 parent 2e20820 commit e7145fc
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public void A_LWWDictionary_should_be_able_to_have_its_entries_correctly_merged_
{
var m1 = LWWDictionary.Create(
Tuple.Create(_node1, "a", 1),
Tuple.Create(_node2, "b", 2));
Tuple.Create(_node1, "b", 2));
var m2 = LWWDictionary.Create(_node2, "c", 3);

var expected = ImmutableDictionary.CreateRange(new[]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public void A_ORDictionary_should_be_able_to_update_an_entry()
Assert.Equal(ImmutableHashSet.Create("B2"), merged2["b"].Elements);
Assert.Equal(ImmutableHashSet.Create("C"), merged2["c"].Elements);

var m4 = merged1.AddOrUpdate(_node2, "b", ORSet<string>.Empty, old => old.Add(_node1, "B3"));
var m4 = merged1.AddOrUpdate(_node2, "b", ORSet<string>.Empty, old => old.Add(_node2, "B3"));
var merged3 = m3.Merge(m4);
Assert.Equal(ImmutableHashSet.Create("A"), merged3["a"].Elements);
Assert.Equal(ImmutableHashSet.Create("B2", "B3"), merged3["b"].Elements);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void A_ORMultiDictionary_should_be_able_to_have_its_entries_correctly_mer
.AddItem(_node1, "d", "D1");

var m2 = ORMultiDictionary<string, string>.Empty
.AddItem(_node2, "c", "C1")
.AddItem(_node2, "c", "C2")
.AddItem(_node2, "a", "A2")
.AddItem(_node2, "b", "B2")
.RemoveItem(_node2, "b", "B2")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public void A_ORSet_should_be_able_to_have_its_element_set_correctly_merged_for_
Assert.Contains(_user3, c1.Elements);

// set 1
var c2 = ORSet<string>.Empty.Add(_node2, _user1).Remove(_node2, _user2).Remove(_node2, _user3);
var c2 = c1.Add(_node2, _user1).Remove(_node2, _user2).Remove(_node2, _user3);
Assert.Contains(_user1, c2.Elements);
Assert.DoesNotContain(_user2, c2.Elements);
Assert.DoesNotContain(_user3, c2.Elements);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public ORMultiDictionary<TKey, TValue> RemoveItem(UniqueAddress node, TKey key,
{
var newUnderlying = _underlying.AddOrUpdate(node, key, ORSet<TValue>.Empty, set => set.Remove(node, element));
ORSet<TValue> found;
if (_underlying.TryGetValue(key, out found) && found.IsEmpty)
if (newUnderlying.TryGetValue(key, out found) && found.IsEmpty)
{
newUnderlying = newUnderlying.Remove(node, key);
}
Expand Down
44 changes: 27 additions & 17 deletions src/contrib/cluster/Akka.DistributedData/ORSet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,30 +79,40 @@ public class ORSet<T> : FastMerge<ORSet<T>>, IORSet<T>, IReplicatedDataSerializa
private readonly IImmutableDictionary<T, VersionVector> _elementsMap;
private readonly VersionVector _versionVector;

private static VersionVector SubtractDots(VersionVector dot, VersionVector vector)
/// <summary>
/// INTERNAL API
/// Subtract the <paramref name="vvector"/> from the <paramref name="dot"/>.
/// What this means is that any (node, version) pair in
/// <paramref name="dot"/> that is &lt;= an entry in <paramref name="vvector"/> is removed from <paramref name="dot"/>.
/// Example [{a, 3}, {b, 2}, {d, 14}, {g, 22}] -
/// [{a, 4}, {b, 1}, {c, 1}, {d, 14}, {e, 5}, {f, 2}] =
/// [{b, 2}, {g, 22}]
/// </summary>
private static VersionVector SubtractDots(VersionVector dot, VersionVector vvector)
{
if (dot.IsEmpty) return VersionVector.Empty;
else

if (dot is SingleVersionVector)
{
// if dot is dominated by version vector, drop it
var single = (SingleVersionVector)dot;
return vvector.VersionAt(single.Node) >= single.Version ? VersionVector.Empty : dot;
}

if (dot is MultiVersionVector)
{
if (dot is SingleVersionVector)
var multi = (MultiVersionVector)dot;
var acc = ImmutableDictionary<UniqueAddress, long>.Empty.ToBuilder();
foreach (var pair in multi.Versions)
{
var single = (SingleVersionVector)dot;
return vector.VersionAt(single.Node) >= single.Version ? VersionVector.Empty : dot;
var v2 = vvector.VersionAt(pair.Key);
if (v2 < pair.Value) acc.Add(pair);
}
else if (dot is MultiVersionVector)
{
var multi = (MultiVersionVector)dot;
var acc = ImmutableDictionary<UniqueAddress, long>.Empty.ToBuilder();
foreach (var pair in multi.Versions)
{
var v2 = vector.VersionAt(pair.Key);
if (v2 < pair.Value) acc.Add(pair);
}

return new MultiVersionVector(acc);
}
else throw new NotSupportedException("Cannot subtract dots from provided version vector");
return VersionVector.Create(acc.ToImmutable());
}

throw new NotSupportedException("Cannot subtract dots from provided version vector");
}

private static IImmutableDictionary<T, VersionVector> MergeCommonKeys(IEnumerable<T> commonKeys, ORSet<T> lhs, ORSet<T> rhs) => commonKeys.Aggregate(ImmutableDictionary<T, VersionVector>.Empty, (acc, k) =>
Expand Down
19 changes: 17 additions & 2 deletions src/contrib/cluster/Akka.DistributedData/VersionVector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Collections;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using Akka.Cluster;
using Akka.DistributedData.Proto;
using Akka.Util.Internal;
Expand All @@ -34,8 +35,17 @@ public enum Ordering
}

public static VersionVector Create(UniqueAddress node, long version) => new SingleVersionVector(node, version);
public static VersionVector Create(IEnumerable<KeyValuePair<UniqueAddress, long>> versions) =>
new MultiVersionVector(versions);

public static VersionVector Create(ImmutableDictionary<UniqueAddress, long> versions)
{
if (versions.IsEmpty) return VersionVector.Empty;
if (versions.Count == 1)
{
var v = versions.First();
return new SingleVersionVector(v.Key, v.Value);
}
return new MultiVersionVector(versions);
}

protected static readonly AtomicCounterLong Counter = new AtomicCounterLong(1L);

Expand Down Expand Up @@ -282,6 +292,8 @@ public override VersionVector Prune(UniqueAddress removedNode, UniqueAddress col

public override VersionVector PruningCleanup(UniqueAddress removedNode) =>
Node == removedNode ? Empty : this;

public override string ToString() => $"VersionVector({Node}->{Version})";
}

[Serializable]
Expand Down Expand Up @@ -346,5 +358,8 @@ public override VersionVector Prune(UniqueAddress removedNode, UniqueAddress col

public override VersionVector PruningCleanup(UniqueAddress removedNode) =>
new MultiVersionVector(Versions.Remove(removedNode));

public override string ToString() =>
$"VersionVector({string.Join(";", Versions.Select(kv => $"({kv.Key}->{kv.Value})"))})";
}
}

0 comments on commit e7145fc

Please sign in to comment.