Skip to content

Commit

Permalink
Merge pull request #4408 from akkadotnet/dev
Browse files Browse the repository at this point in the history
v1.4.6 Release
  • Loading branch information
Aaronontheweb authored May 12, 2020
2 parents 241e851 + da7b07a commit 355089c
Show file tree
Hide file tree
Showing 24 changed files with 837 additions and 217 deletions.
17 changes: 17 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
#### 1.4.6 May 12 2020 ####
**Maintenance Release for Akka.NET 1.4**

Akka.NET v1.4.6 consists of mainly minor bug fixes and updates:

* [Akka.Cluster.DistributedData: Akka.DistributedData.ORDictionary Unable to cast object of type](https://github.com/akkadotnet/akka.net/issues/4400)
* [Akka: add CoordinatedShutdown.run-by-actor-system-terminated setting](https://github.com/akkadotnet/akka.net/issues/4203) - runs `CoordinatedShutdown` whenever `ActorSystem.Terminate()` is called.
* [Akka.Actor: Inconsistent application of SupervisorStrategy on Pool routers](https://github.com/akkadotnet/akka.net/issues/3626)

To see the full set of changes for Akka.NET 1.4.6, please [see the 1.4.6 milestone](https://github.com/akkadotnet/akka.net/milestone/37).

| COMMITS | LOC+ | LOC- | AUTHOR |
| --- | --- | --- | --- |
| 3 | 746 | 96 | Gregorius Soedharmo |
| 1 | 65 | 117 | Igor Fedchenko |
| 1 | 10 | 5 | Bogdan-Rotund |

#### 1.4.5 April 29 2020 ####
**Maintenance Release for Akka.NET 1.4**

Expand Down
20 changes: 17 additions & 3 deletions docs/articles/actors/coordinated-shutdown.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,23 @@ It's safe to call this method multiple times as the shutdown process will only b
> It's possible to subclass the `CoordinatedShutdown.Reason` type and pass in a custom implementation which includes custom properties and data. This data is accessible inside the shutdown phases themselves via the [`CoordinatedShutdown.ShutdownReason` property](/api/Akka.Actor.CoordinatedShutdown.html#Akka_Actor_CoordinatedShutdown_ShutdownReason).
### Automatic `ActorSystem` and Process Termination
By default, when the final phase of the `CoordinatedShutdown` executes the calling `ActorSystem` will be terminated. However, the CLR process will still be running even though the `ActorSystem` has been terminated.
By default, when the final phase of the `CoordinatedShutdown` executes the calling `ActorSystem` will be terminated. This behaviour can be changed by setting the following HOCON value in your configuration:

If you'd like to automatically terminate the process running your `ActorSystem`, you can set the following HOCON value in your configuration:
```
akka.coordinated-shutdown.terminate-actor-system = off
```
If this setting is disabled (it is enabled b default), the `ActorSystem` will not be terminated as the final phase of the `CoordinatedShutdown` phases.

`CoordinatedShutdown` phases, by default, are also executed when the `ActorSystem` is terminated. You can change this behavior by disabling this HOCON value in your configuration:

```
akka.coordinated-shutdown.run-by-actor-system-terminate = off
```

> [!NOTE]
> It is illegal to have _run-by-actor-system-terminate_ enabled and have _terminate-actor-system_ disabled. Having these configuration combination will raise a `ConfigurationException` during `ActorSystem` startup.
The CLR process will still be running, even when the `ActorSystem` is terminated by the `CoordinatedShutdown`. If you'd like to automatically terminate the process running your `ActorSystem`, you can set the following HOCON value in your configuration:

```
akka.coordinated-shutdown.exit-clr = on
Expand All @@ -154,7 +168,7 @@ By default, this graceful leave action will by triggered whenever the `Coordinat
`CoordinatedShutdown.Run()` will also be executed if a node is removed via `Cluster.Down` (non-graceful exit), but this can be disabled by changing the following Akka.Cluster HOCON setting:

```
akka.cluster.run-coordinated-shutdown-when-down = off
akka.cluster.run-coordinated-shutdown-when-down = off
```

### Invoking `CoordinatedShutdown.Run()` on Process Exit
Expand Down
22 changes: 20 additions & 2 deletions src/Akka.sln
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.28307.645
# Visual Studio Version 16
VisualStudioVersion = 16.0.29911.84
MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Benchmark", "Benchmark", "{73108242-625A-4D7B-AA09-63375DBAE464}"
EndProject
Expand Down Expand Up @@ -217,6 +217,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Akka.Coordination", "core\A
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Akka.Coordination.Tests", "core\Akka.Coordination.Tests\Akka.Coordination.Tests.csproj", "{6AA74665-DFE0-450B-8F66-19125ABBB1C7}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "TcpEchoServer", "TcpEchoServer", "{52A36134-AC41-4F38-9D47-1124B0C9CDD2}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TcpEchoService.Server", "examples\TcpEchoService.Server\TcpEchoService.Server.csproj", "{8AD2DF54-B79F-490B-B2C6-94EDA397055F}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -976,6 +980,18 @@ Global
{6AA74665-DFE0-450B-8F66-19125ABBB1C7}.Release|x64.Build.0 = Release|Any CPU
{6AA74665-DFE0-450B-8F66-19125ABBB1C7}.Release|x86.ActiveCfg = Release|Any CPU
{6AA74665-DFE0-450B-8F66-19125ABBB1C7}.Release|x86.Build.0 = Release|Any CPU
{8AD2DF54-B79F-490B-B2C6-94EDA397055F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{8AD2DF54-B79F-490B-B2C6-94EDA397055F}.Debug|Any CPU.Build.0 = Debug|Any CPU
{8AD2DF54-B79F-490B-B2C6-94EDA397055F}.Debug|x64.ActiveCfg = Debug|Any CPU
{8AD2DF54-B79F-490B-B2C6-94EDA397055F}.Debug|x64.Build.0 = Debug|Any CPU
{8AD2DF54-B79F-490B-B2C6-94EDA397055F}.Debug|x86.ActiveCfg = Debug|Any CPU
{8AD2DF54-B79F-490B-B2C6-94EDA397055F}.Debug|x86.Build.0 = Debug|Any CPU
{8AD2DF54-B79F-490B-B2C6-94EDA397055F}.Release|Any CPU.ActiveCfg = Release|Any CPU
{8AD2DF54-B79F-490B-B2C6-94EDA397055F}.Release|Any CPU.Build.0 = Release|Any CPU
{8AD2DF54-B79F-490B-B2C6-94EDA397055F}.Release|x64.ActiveCfg = Release|Any CPU
{8AD2DF54-B79F-490B-B2C6-94EDA397055F}.Release|x64.Build.0 = Release|Any CPU
{8AD2DF54-B79F-490B-B2C6-94EDA397055F}.Release|x86.ActiveCfg = Release|Any CPU
{8AD2DF54-B79F-490B-B2C6-94EDA397055F}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -1070,6 +1086,8 @@ Global
{FC5216C2-3C98-4691-8E72-205EBF9594D0} = {76F58DC4-19F1-43EF-A6E2-EC1CC8395AC5}
{ADDA116F-7AB5-41DE-A95B-D5E64688E9A0} = {01167D3C-49C4-4CDE-9787-C176D139ACDD}
{6AA74665-DFE0-450B-8F66-19125ABBB1C7} = {01167D3C-49C4-4CDE-9787-C176D139ACDD}
{52A36134-AC41-4F38-9D47-1124B0C9CDD2} = {D3AF8295-AEB5-4324-AA82-FCC0014AC310}
{8AD2DF54-B79F-490B-B2C6-94EDA397055F} = {52A36134-AC41-4F38-9D47-1124B0C9CDD2}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {03AD8E21-7507-4E68-A4E9-F4A7E7273164}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
//-----------------------------------------------------------------------

using System;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Collections.Immutable;
using Akka.Actor;
using Akka.Cluster;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;
using Akka.DistributedData.Internal;

namespace Akka.DistributedData.Tests
{
Expand Down Expand Up @@ -125,5 +127,39 @@ public void LWWDictionary_must_be_able_to_work_with_deltas()
{"c", 3}
});
}

/// <summary>
/// Bug reproduction: https://github.com/akkadotnet/akka.net/issues/4400
/// </summary>
[Fact]
public async Task Bugfix_4400_LWWDictionary_Deltas_must_merge_other_LWWDictionary()
{
var m1 = LWWDictionary<string, string>.Empty
.SetItem(_node1, "a", "A")
.SetItem(_node1, "b", "B1");

await Task.Delay(200);

var m2 = LWWDictionary<string, string>.Empty
.SetItem(_node2, "c", "C")
.SetItem(_node2, "b", "B2");

// This is how deltas really get merged inside the replicator
var dataEnvelope = new DataEnvelope(m1.Delta);
if (dataEnvelope.Data is IReplicatedDelta withDelta)
{
dataEnvelope = dataEnvelope.WithData(withDelta.Zero.MergeDelta(withDelta));
}

// Bug: this is was an ORDictionary<string, ORSet<string>> under #4302
var storedData = dataEnvelope.Data;

// simulate merging an update
var merged1 = (LWWDictionary<string, string>)m2.Merge(storedData);

merged1.Entries["a"].Should().BeEquivalentTo("A");
merged1.Entries["b"].Should().BeEquivalentTo("B2");
merged1.Entries["c"].Should().BeEquivalentTo("C");
}
}
}
93 changes: 93 additions & 0 deletions src/contrib/cluster/Akka.DistributedData.Tests/ReplicatorSpecs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ static ReplicatorSpecs()
private readonly ORDictionaryKey<string, Flag> _keyH = new ORDictionaryKey<string, Flag>("H");
private readonly GSetKey<string> _keyI = new GSetKey<string>("I");
private readonly ORMultiValueDictionaryKey<string, string> _keyJ = new ORMultiValueDictionaryKey<string, string>("J");
private readonly LWWDictionaryKey<string, string> _keyK = new LWWDictionaryKey<string, string>("K");

public ReplicatorSpecs(ITestOutputHelper helper) : base(SpecConfig, helper)
{
Expand Down Expand Up @@ -256,6 +257,98 @@ private async Task PNCounterDictionary_Should_Merge()
Sys.Log.Info("Done");
}

/// <summary>
/// Reproduction spec for https://github.com/akkadotnet/akka.net/issues/4400
/// </summary>
[Fact]
public async Task Bugfix_4400_LWWDictionary_Merge()
{
await InitCluster();

var changedProbe = CreateTestProbe(_sys2);

// subscribe to updates for KeyJ, then
_replicator2.Tell(Dsl.Subscribe(_keyK, changedProbe.Ref));

Within(TimeSpan.FromSeconds(2), () =>
{
AwaitAssert(() =>
{
// update it with a replication factor of two
_replicator2.Tell(Dsl.Update(
_keyK,
LWWDictionary<string, string>.Empty,
WriteLocal.Instance,
x => x.SetItem(Cluster.Cluster.Get(_sys2), "a", "A")));

// receive local update
var entries = changedProbe.ExpectMsg<Changed>(g => Equals(g.Key, _keyK)).Get(_keyK).Entries;
entries.ShouldAllBeEquivalentTo(new Dictionary<string, string> {
{"a", "A" }
});
});
});

Within(TimeSpan.FromSeconds(2), () =>
{
AwaitAssert(() =>
{
// push update from node 1
// add item
_replicator1.Tell(Dsl.Update(
_keyK,
LWWDictionary<string, string>.Empty,
WriteLocal.Instance,
x => x.SetItem(Cluster.Cluster.Get(_sys1), "a", "A1")));

var entries = changedProbe.ExpectMsg<Changed>(g => Equals(g.Key, _keyK)).Get(_keyK).Entries;
// expect replication of update on node 2
entries.ShouldAllBeEquivalentTo(new Dictionary<string, string> {
{"a", "A1" }
});
});
});

Within(TimeSpan.FromSeconds(2), () =>
{
AwaitAssert(() =>
{
// remove item
_replicator1.Tell(Dsl.Update(
_keyK,
LWWDictionary<string, string>.Empty,
WriteLocal.Instance,
x => x.Remove(Cluster.Cluster.Get(_sys1), "a")));

var entries = changedProbe.ExpectMsg<Changed>(g => Equals(g.Key, _keyK)).Get(_keyK).Entries;
// expect replication of remove on node 2
entries.ShouldAllBeEquivalentTo(new Dictionary<string, string> ());
});
});

Within(TimeSpan.FromSeconds(2), () =>
{
AwaitAssert(() =>
{
// send multiple updates
_replicator1.Tell(Dsl.Update(
_keyK,
LWWDictionary<string, string>.Empty,
WriteLocal.Instance,
x => x
.SetItem(Cluster.Cluster.Get(_sys1), "a", "A")
.SetItem(Cluster.Cluster.Get(_sys1), "b", "B")));

var entries = changedProbe.ExpectMsg<Changed>(g => Equals(g.Key, _keyK)).Get(_keyK).Entries;
// expect replication of remove on node 2
entries.ShouldAllBeEquivalentTo(new Dictionary<string, string> {
{ "a", "A" },
{ "b", "B" },
});
});
});
}

/// <summary>
/// Reproduction spec for https://github.com/akkadotnet/akka.net/issues/4198
/// </summary>
Expand Down
90 changes: 87 additions & 3 deletions src/contrib/cluster/Akka.DistributedData/LWWDictionary.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@ internal interface ILWWDictionaryKey
Type ValueType { get; }
}

/// <summary>
/// INTERNAL API.
///
/// For serialization purposes.
/// </summary>
internal interface ILWWDictionaryDeltaOperation
{
ORDictionary.IDeltaOperation Underlying { get; }
}

/// <summary>
/// Typed key used to store <see cref="LWWDictionary{TKey,TValue}"/> replica
/// inside current <see cref="Replicator"/> key-value store.
Expand Down Expand Up @@ -329,7 +339,74 @@ public override string ToString()
return sb.ToString();
}

public ORDictionary<TKey, LWWRegister<TValue>>.IDeltaOperation Delta => Underlying.Delta;
#region delta

internal sealed class LWWDictionaryDelta : ORDictionary<TKey, LWWRegister<TValue>>.IDeltaOperation, IReplicatedDeltaSize, ILWWDictionaryDeltaOperation
{
internal readonly ORDictionary<TKey, LWWRegister<TValue>>.IDeltaOperation Underlying;

public LWWDictionaryDelta(ORDictionary<TKey, LWWRegister<TValue>>.IDeltaOperation underlying)
{
Underlying = underlying;
if (underlying is IReplicatedDeltaSize s)
{
DeltaSize = s.DeltaSize;
}
else
{
DeltaSize = 1;
}
}

public IReplicatedData Merge(IReplicatedData other)
{
if (other is LWWDictionaryDelta d)
{
return new LWWDictionaryDelta((ORDictionary<TKey, LWWRegister<TValue>>.IDeltaOperation)Underlying.Merge(d.Underlying));
}

return new LWWDictionaryDelta((ORDictionary<TKey, LWWRegister<TValue>>.IDeltaOperation)Underlying.Merge(other));
}

public IDeltaReplicatedData Zero => LWWDictionary<TKey, TValue>.Empty;

public override bool Equals(object obj)
{
return obj is LWWDictionary<TKey, TValue>.LWWDictionaryDelta operation &&
Equals(operation.Underlying);
}

public bool Equals(ORDictionary<TKey, LWWRegister<TValue>>.IDeltaOperation other)
{
if (other is ORDictionary<TKey, LWWRegister<TValue>>.DeltaGroup group)
{
if (Underlying is ORDictionary<TKey, LWWRegister<TValue>>.DeltaGroup ourGroup)
{
return ourGroup.Operations.SequenceEqual(group.Operations);
}

if (group.Operations.Length == 1)
{
return Underlying.Equals(group.Operations.First());
}

return false;
}
return Underlying.Equals(other);
}

public override int GetHashCode()
{
return Underlying.GetHashCode();
}

public int DeltaSize { get; }
ORDictionary.IDeltaOperation ILWWDictionaryDeltaOperation.Underlying => (ORDictionary.IDeltaOperation)Underlying;
}

// TODO: optimize this so it doesn't allocate each time it's called
public ORDictionary<TKey, LWWRegister<TValue>>.IDeltaOperation Delta =>
new LWWDictionaryDelta(Underlying.Delta);

IReplicatedDelta IDeltaReplicatedData.Delta => Delta;

Expand All @@ -338,12 +415,19 @@ IReplicatedData IDeltaReplicatedData.MergeDelta(IReplicatedDelta delta) =>

IReplicatedData IDeltaReplicatedData.ResetDelta() => ResetDelta();

public LWWDictionary<TKey, TValue> MergeDelta(ORDictionary<TKey, LWWRegister<TValue>>.IDeltaOperation delta) =>
new LWWDictionary<TKey, TValue>(Underlying.MergeDelta(delta));
public LWWDictionary<TKey, TValue> MergeDelta(ORDictionary<TKey, LWWRegister<TValue>>.IDeltaOperation delta)
{
if (delta is LWWDictionaryDelta lwwd)
delta = lwwd.Underlying;

return new LWWDictionary<TKey, TValue>(Underlying.MergeDelta(delta));
}

public LWWDictionary<TKey, TValue> ResetDelta() =>
new LWWDictionary<TKey, TValue>(Underlying.ResetDelta());

#endregion

public Type KeyType => typeof(TKey);
public Type ValueType => typeof(TValue);
}
Expand Down
Loading

0 comments on commit 355089c

Please sign in to comment.