From c95b2d2d1e847d532b955a7ab1f93d5527836dd8 Mon Sep 17 00:00:00 2001 From: Bartosz Sypytkowski Date: Sun, 5 Feb 2017 17:07:03 +0100 Subject: [PATCH] Initial replacement of Helios with DotNetty (#2444) * initial replacement of Helios with DotNetty * hack for client to not connet to IPAddress.Any * fixed public-hostname resolution * initialized ssl tests * integrated with DotNetty logging pipeline * working ssl spec * moved Helios transport to separate project + DotNetty backward compatibility spec for config * working configuration compatibility between DotNetty & Helios * minor fixes * fixed tests logging and ssl tests * setup byte-order for DotNetty transport * fixed compatibility spec * fixed sln file * fixed MessageSerializerRemotingSpec * fixed problems with F# projects * fixed DNS public hostname specs * fixed F# files again * skipped ssl test for linux/mono * last polishing * post-rebase fixes --- .gitignore | 3 + src/Akka.sln | 14 + src/SharedAssemblyInfo.cs | 4 +- .../ClusterClient/ClusterClientSpec.cs | 2 +- .../DistributedPubSubRestartSpec.cs | 2 +- .../LocalConcurrencySpec.cs | 2 +- .../ReplicatedDataSerializerSpec.cs | 2 +- .../ReplicatorMessageSerializerSpec.cs | 2 +- .../WriteAggregatorSpec.cs | 2 +- .../testkits/Akka.TestKit.Xunit2/TestKit.cs | 19 +- .../Akka.Remote.Transport.Helios.csproj | 84 +++ .../Akka.Remote.Transport.Helios.nuspec | 20 + .../HeliosHelpers.cs | 25 +- .../HeliosTcpTransport.cs | 318 ++++++++++++ .../HeliosTransport.cs | 227 +------- .../HeliosTransportSettings.cs | 200 ++++++++ .../Properties/AssemblyInfo.cs | 36 ++ .../packages.config | 5 + .../Akka.Remote.Transport.Helios/remote.conf | 142 +++++ .../QuickRestartSpec.cs | 2 +- .../RestartFirstSeedNodeSpec.cs | 2 +- .../RestartNode3Spec.cs | 2 +- .../RestartNodeSpec.cs | 2 +- .../UnreachableNodeJoinsAgainSpec.cs | 2 +- .../Akka.Cluster.Tests/ClusterDeployerSpec.cs | 2 +- .../ClusterDomainEventPublisherSpec.cs | 2 +- src/core/Akka.Cluster.Tests/ClusterSpec.cs | 4 +- .../Routing/ClusterRouterAsk1343BugFixSpec.cs | 2 +- .../Routing/ClusterRouterSupervisorSpec.cs | 2 +- .../StartupWithOneThreadSpec.cs | 2 +- .../Akka.FSharp.Tests.fsproj | 4 +- src/core/Akka.FSharp.Tests/ApiTests.fs | 2 +- src/core/Akka.FSharp.Tests/app.config | 2 +- src/core/Akka.FSharp/Akka.FSharp.fsproj | 4 +- .../Akka.FSharp/Properties/AssemblyInfo.fs | 6 +- src/core/Akka.FSharp/app.config | 2 +- .../Akka.Persistence.FSharp.fsproj | 4 +- src/core/Akka.Persistence.FSharp/app.config | 2 +- .../Serialization/SerializerSpec.cs | 3 +- src/core/Akka.Remote.TestKit/MultiNodeSpec.cs | 8 +- .../Akka.Remote.TestKit/RemoteConnection.cs | 2 +- .../RemoteNodeRestartDeathWatchSpec.cs | 2 +- .../RemoteNodeRestartGateSpec.cs | 2 +- .../RemoteNodeShutdownAndComesBackSpec.cs | 2 +- .../RemoteQuarantinePiercingSpec.cs | 4 +- .../RemoteRestartedQuarantinedSpec.cs | 2 +- .../HeliosRemoteMessagingThroughputSpec.cs | 6 +- src/core/Akka.Remote.Tests/ActorsLeakSpec.cs | 15 +- .../Akka.Remote.Tests.csproj | 51 +- .../Akka.Remote.Tests/RemoteConfigSpec.cs | 26 +- .../Akka.Remote.Tests/RemoteDeathWatchSpec.cs | 4 +- .../Akka.Remote.Tests/RemoteDeployerSpec.cs | 2 +- .../RemoteMessageLocalDeliverySpec.cs | 9 +- .../Akka.Remote.Tests/RemoteMetricsSpec.cs | 9 +- .../Akka.Remote.Tests/RemoteRouterSpec.cs | 7 +- .../Akka.Remote.Tests/RemoteWatcherSpec.cs | 7 +- src/core/Akka.Remote.Tests/RemotingSpec.cs | 10 +- .../RemotingTerminatorSpecs.cs | 5 +- .../Resources/akka-validcert.pfx | Bin 0 -> 2509 bytes .../DaemonMsgCreateSerializerSpec.cs | 2 +- .../Transport/AkkaProtocolStressTest.cs | 6 +- .../Transport/BackwardCompatibilitySpec.cs | 146 ++++++ .../Transport/DotNettySslSupportSpec.cs | 129 +++++ ... => DotNettyTransportDnsResolutionSpec.cs} | 30 +- .../ThrottlerTransportAdapterSpec.cs | 11 +- src/core/Akka.Remote.Tests/UntrustedSpec.cs | 9 +- src/core/Akka.Remote.Tests/app.config | 8 +- src/core/Akka.Remote.Tests/packages.config | 20 + src/core/Akka.Remote/Akka.Remote.csproj | 32 +- .../Akka.Remote/Configuration/Remote.conf | 24 +- src/core/Akka.Remote/Endpoint.cs | 2 +- .../Transport/AkkaProtocolTransport.cs | 2 +- .../Transport/DotNetty/AkkaLoggingHandler.cs | 219 ++++++++ .../Transport/DotNetty/DotNettyTransport.cs | 483 ++++++++++++++++++ .../DotNetty/DotNettyTransportSettings.cs | 284 ++++++++++ .../Transport/DotNetty/TcpTransport.cs | 235 +++++++++ src/core/Akka.Remote/packages.config | 6 +- .../Akka.Tests.Shared.Internals/AkkaSpec.cs | 4 +- src/examples/Chat/ChatClient/Program.cs | 2 +- src/examples/Chat/ChatServer/Program.cs | 2 +- .../ClusterSharding.Node/App.config | 2 +- .../ClusterToolsExample.Node/App.config | 2 +- .../ClusterToolsExample.Seed/App.config | 2 +- .../Samples.Cluster.Transformation/App.config | 2 +- .../Samples.Cluster.Transformation/Program.cs | 4 +- .../App.config | 2 +- .../Program.cs | 4 +- .../Cluster/Samples.Cluster.Simple/App.config | 2 +- .../Cluster/Samples.Cluster.Simple/Program.cs | 2 +- src/examples/FSharp.Api/App.config | 2 +- src/examples/FSharp.Api/FSharp.Api.fsproj | 4 +- src/examples/FSharp.Deploy.Local/App.config | 2 +- .../FSharp.Deploy.Local.fsproj | 46 +- src/examples/FSharp.Deploy.Local/Program.fs | 2 +- src/examples/FSharp.Deploy.Remote/App.config | 2 +- .../FSharp.Deploy.Remote.fsproj | 46 +- src/examples/FSharp.Deploy.Remote/Program.fs | 2 +- .../HelloWorld/FSharp.HelloAkka/App.config | 2 +- .../FSharp.HelloAkka/FSharp.HelloAkka.fsproj | 42 +- .../PersistenceExample.FsApi/App.config | 2 +- .../PersistenceExample.FsApi.fsproj | 4 +- src/examples/RemoteDeploy/System1/Program.cs | 2 +- src/examples/RemoteDeploy/System2/Program.cs | 2 +- src/examples/TimeServer/TimeClient/App.config | 4 +- src/examples/TimeServer/TimeServer/App.config | 4 +- 105 files changed, 2702 insertions(+), 479 deletions(-) create mode 100644 src/contrib/transports/Akka.Remote.Transport.Helios/Akka.Remote.Transport.Helios.csproj create mode 100644 src/contrib/transports/Akka.Remote.Transport.Helios/Akka.Remote.Transport.Helios.nuspec rename src/{core/Akka.Remote/Transport/Helios => contrib/transports/Akka.Remote.Transport.Helios}/HeliosHelpers.cs (87%) create mode 100644 src/contrib/transports/Akka.Remote.Transport.Helios/HeliosTcpTransport.cs rename src/{core/Akka.Remote/Transport/Helios => contrib/transports/Akka.Remote.Transport.Helios}/HeliosTransport.cs (72%) create mode 100644 src/contrib/transports/Akka.Remote.Transport.Helios/HeliosTransportSettings.cs create mode 100644 src/contrib/transports/Akka.Remote.Transport.Helios/Properties/AssemblyInfo.cs create mode 100644 src/contrib/transports/Akka.Remote.Transport.Helios/packages.config create mode 100644 src/contrib/transports/Akka.Remote.Transport.Helios/remote.conf create mode 100644 src/core/Akka.Remote.Tests/Resources/akka-validcert.pfx create mode 100644 src/core/Akka.Remote.Tests/Transport/BackwardCompatibilitySpec.cs create mode 100644 src/core/Akka.Remote.Tests/Transport/DotNettySslSupportSpec.cs rename src/core/Akka.Remote.Tests/Transport/{HeliosTransportDnsResolutionSpec.cs => DotNettyTransportDnsResolutionSpec.cs} (92%) create mode 100644 src/core/Akka.Remote/Transport/DotNetty/AkkaLoggingHandler.cs create mode 100644 src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs create mode 100644 src/core/Akka.Remote/Transport/DotNetty/DotNettyTransportSettings.cs create mode 100644 src/core/Akka.Remote/Transport/DotNetty/TcpTransport.cs diff --git a/.gitignore b/.gitignore index 45faba77c45..aa94efd498e 100644 --- a/.gitignore +++ b/.gitignore @@ -228,3 +228,6 @@ target/ # API Approval received files **/Akka.API.Tests/*.received.txt + +# include self-signed test certificate for SSL Tests +!src/core/Akka.Remote.Tests/Resources/akka-validcert.pfx \ No newline at end of file diff --git a/src/Akka.sln b/src/Akka.sln index 54e6e1f8b84..0ba21303d89 100644 --- a/src/Akka.sln +++ b/src/Akka.sln @@ -256,6 +256,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Serialization.Hyperion EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Serialization.Hyperion.Tests", "contrib\serializers\Akka.Serialization.Hyperion.Tests\Akka.Serialization.Hyperion.Tests.csproj", "{E3549E17-7206-44F0-A322-EDC058A0498F}" EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Transports", "Transports", "{6AD34FA4-7584-454D-ACD4-A8911E0BDB00}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Remote.Transport.Helios", "contrib\transports\Akka.Remote.Transport.Helios\Akka.Remote.Transport.Helios.csproj", "{29FEAABC-E326-450A-9008-B5FCECF0115F}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug Mono|Any CPU = Debug Mono|Any CPU @@ -969,6 +973,14 @@ Global {E3549E17-7206-44F0-A322-EDC058A0498F}.Release Mono|Any CPU.Build.0 = Release|Any CPU {E3549E17-7206-44F0-A322-EDC058A0498F}.Release|Any CPU.ActiveCfg = Release|Any CPU {E3549E17-7206-44F0-A322-EDC058A0498F}.Release|Any CPU.Build.0 = Release|Any CPU + {29FEAABC-E326-450A-9008-B5FCECF0115F}.Debug Mono|Any CPU.ActiveCfg = Debug|Any CPU + {29FEAABC-E326-450A-9008-B5FCECF0115F}.Debug Mono|Any CPU.Build.0 = Debug|Any CPU + {29FEAABC-E326-450A-9008-B5FCECF0115F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {29FEAABC-E326-450A-9008-B5FCECF0115F}.Debug|Any CPU.Build.0 = Debug|Any CPU + {29FEAABC-E326-450A-9008-B5FCECF0115F}.Release Mono|Any CPU.ActiveCfg = Release|Any CPU + {29FEAABC-E326-450A-9008-B5FCECF0115F}.Release Mono|Any CPU.Build.0 = Release|Any CPU + {29FEAABC-E326-450A-9008-B5FCECF0115F}.Release|Any CPU.ActiveCfg = Release|Any CPU + {29FEAABC-E326-450A-9008-B5FCECF0115F}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -1085,5 +1097,7 @@ Global {53A657B3-D3CD-461E-9759-664331218060} = {76F58DC4-19F1-43EF-A6E2-EC1CC8395AC5} {6C995BF6-1F8D-40EE-B608-68E36EA6A6F8} = {0E55F1F8-E212-43D7-A0C0-ACEA9794B0D7} {E3549E17-7206-44F0-A322-EDC058A0498F} = {0E55F1F8-E212-43D7-A0C0-ACEA9794B0D7} + {6AD34FA4-7584-454D-ACD4-A8911E0BDB00} = {588C1513-FAB6-42C3-B6FC-3485F13620CF} + {29FEAABC-E326-450A-9008-B5FCECF0115F} = {6AD34FA4-7584-454D-ACD4-A8911E0BDB00} EndGlobalSection EndGlobal diff --git a/src/SharedAssemblyInfo.cs b/src/SharedAssemblyInfo.cs index 3b7ae7eb143..c4d948402d4 100644 --- a/src/SharedAssemblyInfo.cs +++ b/src/SharedAssemblyInfo.cs @@ -4,5 +4,5 @@ [assembly: AssemblyCompanyAttribute("Akka.NET Team")] [assembly: AssemblyCopyrightAttribute("Copyright © 2013-2017 Akka.NET Team")] [assembly: AssemblyTrademarkAttribute("")] -[assembly: AssemblyVersionAttribute("1.1.2.0")] -[assembly: AssemblyFileVersionAttribute("1.1.2.0")] +[assembly: AssemblyVersionAttribute("1.1.3.0")] +[assembly: AssemblyFileVersionAttribute("1.1.3.0")] diff --git a/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/ClusterClient/ClusterClientSpec.cs b/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/ClusterClient/ClusterClientSpec.cs index 680fb621a5a..2c72e244abd 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/ClusterClient/ClusterClientSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/ClusterClient/ClusterClientSpec.cs @@ -611,7 +611,7 @@ public void ClusterClient_must_reestablish_connection_to_receptionist_after_serv // start new system on same port var sys2 = ActorSystem.Create( Sys.Name, - ConfigurationFactory.ParseString("akka.remote.helios.tcp.port=" + Cluster.Get(Sys).SelfAddress.Port).WithFallback(Sys.Settings.Config)); + ConfigurationFactory.ParseString("akka.remote.dot-netty.tcp.port=" + Cluster.Get(Sys).SelfAddress.Port).WithFallback(Sys.Settings.Config)); Cluster.Get(sys2).Join(Cluster.Get(sys2).SelfAddress); var service2 = sys2.ActorOf(Props.Create(() => new ClusterClientSpecConfig.TestService(TestActor)), "service2"); ClusterClientReceptionist.Get(sys2).RegisterService(service2); diff --git a/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/PublishSubscribe/DistributedPubSubRestartSpec.cs b/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/PublishSubscribe/DistributedPubSubRestartSpec.cs index abd8391b3ed..e3db86f3cd4 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/PublishSubscribe/DistributedPubSubRestartSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/PublishSubscribe/DistributedPubSubRestartSpec.cs @@ -146,7 +146,7 @@ public void A_Cluster_with_DistributedPubSub_must_handle_restart_of_nodes_with_s var newSystem = ActorSystem.Create( Sys.Name, ConfigurationFactory - .ParseString($"akka.remote.helios.tcp.port={Cluster.Get(Sys).SelfAddress.Port}") + .ParseString($"akka.remote.dot-netty.tcp.port={Cluster.Get(Sys).SelfAddress.Port}") .WithFallback(Sys.Settings.Config)); try diff --git a/src/contrib/cluster/Akka.DistributedData.Tests/LocalConcurrencySpec.cs b/src/contrib/cluster/Akka.DistributedData.Tests/LocalConcurrencySpec.cs index c3237797f10..5cdef1ac125 100644 --- a/src/contrib/cluster/Akka.DistributedData.Tests/LocalConcurrencySpec.cs +++ b/src/contrib/cluster/Akka.DistributedData.Tests/LocalConcurrencySpec.cs @@ -39,7 +39,7 @@ public Updater() public LocalConcurrencySpec(ITestOutputHelper output) : base(ConfigurationFactory.ParseString(@" akka.actor.provider = ""Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"" - akka.remote.helios.tcp.port = 0"), + akka.remote.dot-netty.tcp.port = 0"), "LocalConcurrencySpec", output) { _replicator = DistributedData.Get(Sys).Replicator; diff --git a/src/contrib/cluster/Akka.DistributedData.Tests/Serialization/ReplicatedDataSerializerSpec.cs b/src/contrib/cluster/Akka.DistributedData.Tests/Serialization/ReplicatedDataSerializerSpec.cs index fd795edf5ac..727e1423eb2 100644 --- a/src/contrib/cluster/Akka.DistributedData.Tests/Serialization/ReplicatedDataSerializerSpec.cs +++ b/src/contrib/cluster/Akka.DistributedData.Tests/Serialization/ReplicatedDataSerializerSpec.cs @@ -25,7 +25,7 @@ public class ReplicatedDataSerializerSpec : TestKit.Xunit2.TestKit akka.actor { provider=""Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"" } - akka.remote.helios.tcp.port = 0").WithFallback(DistributedData.DefaultConfig()); + akka.remote.dot-netty.tcp.port = 0").WithFallback(DistributedData.DefaultConfig()); private readonly UniqueAddress _address1 = new UniqueAddress(new Address("akka.tcp", "sys", "some.host.org", 4711), 1); private readonly UniqueAddress _address2 = new UniqueAddress(new Address("akka.tcp", "sys", "other.host.org", 4711), 2); diff --git a/src/contrib/cluster/Akka.DistributedData.Tests/Serialization/ReplicatorMessageSerializerSpec.cs b/src/contrib/cluster/Akka.DistributedData.Tests/Serialization/ReplicatorMessageSerializerSpec.cs index 330c635d710..ad5f8c6520a 100644 --- a/src/contrib/cluster/Akka.DistributedData.Tests/Serialization/ReplicatorMessageSerializerSpec.cs +++ b/src/contrib/cluster/Akka.DistributedData.Tests/Serialization/ReplicatorMessageSerializerSpec.cs @@ -26,7 +26,7 @@ public class ReplicatorMessageSerializerSpec : TestKit.Xunit2.TestKit akka.actor { provider=""Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"" } - akka.remote.helios.tcp.port = 0").WithFallback(DistributedData.DefaultConfig()); + akka.remote.dot-netty.tcp.port = 0").WithFallback(DistributedData.DefaultConfig()); private readonly UniqueAddress _address1 = new UniqueAddress(new Address("akka.tcp", "sys", "some.host.org", 4711), 1); private readonly UniqueAddress _address2 = new UniqueAddress(new Address("akka.tcp", "sys", "other.host.org", 4711), 2); diff --git a/src/contrib/cluster/Akka.DistributedData.Tests/WriteAggregatorSpec.cs b/src/contrib/cluster/Akka.DistributedData.Tests/WriteAggregatorSpec.cs index 3911077d484..a91e3d706ce 100644 --- a/src/contrib/cluster/Akka.DistributedData.Tests/WriteAggregatorSpec.cs +++ b/src/contrib/cluster/Akka.DistributedData.Tests/WriteAggregatorSpec.cs @@ -68,7 +68,7 @@ public WriteAckAdapter(IActorRef replica) public WriteAggregatorSpec(ITestOutputHelper output) : base(ConfigurationFactory.ParseString(@" akka.actor.provider = ""Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"" - akka.remote.helios.tcp.port = 0"), "WriteAggregatorSpec", output) + akka.remote.dot-netty.tcp.port = 0"), "WriteAggregatorSpec", output) { _nodes = ImmutableHashSet.CreateRange(new[] {_nodeA, _nodeB, _nodeC, _nodeD}); } diff --git a/src/contrib/testkits/Akka.TestKit.Xunit2/TestKit.cs b/src/contrib/testkits/Akka.TestKit.Xunit2/TestKit.cs index aa033ab2f98..8c6ac58d5c9 100644 --- a/src/contrib/testkits/Akka.TestKit.Xunit2/TestKit.cs +++ b/src/contrib/testkits/Akka.TestKit.Xunit2/TestKit.cs @@ -23,6 +23,8 @@ public class TestKit : TestKitBase , IDisposable private static readonly XunitAssertions _assertions=new XunitAssertions(); private bool _isDisposed; //Automatically initialized to false; + protected readonly ITestOutputHelper Output; + /// /// Create a new instance of the for xUnit class. /// If no is passed in, a new system @@ -33,7 +35,8 @@ public class TestKit : TestKitBase , IDisposable public TestKit(ActorSystem system = null, ITestOutputHelper output = null) : base(_assertions, system) { - InitializeLogger(output); + Output = output; + InitializeLogger(Sys); } /// @@ -46,7 +49,8 @@ public TestKit(ActorSystem system = null, ITestOutputHelper output = null) public TestKit(Config config, string actorSystemName = null, ITestOutputHelper output = null) : base(_assertions, config, actorSystemName) { - InitializeLogger(output); + Output = output; + InitializeLogger(Sys); } @@ -58,7 +62,8 @@ public TestKit(Config config, string actorSystemName = null, ITestOutputHelper o /// TBD public TestKit(string config, ITestOutputHelper output = null) : base(_assertions, ConfigurationFactory.ParseString(config)) { - InitializeLogger(output); + Output = output; + InitializeLogger(Sys); } /// @@ -107,12 +112,12 @@ public void Dispose() GC.SuppressFinalize(this); } - private void InitializeLogger(ITestOutputHelper output) + protected void InitializeLogger(ActorSystem system) { - if (output != null) + if (Output != null) { - var system = (ExtendedActorSystem) Sys; - var logger = system.SystemActorOf(Props.Create(() => new TestOutputLogger(output)), "log-test"); + var extSystem = (ExtendedActorSystem)system; + var logger = extSystem.SystemActorOf(Props.Create(() => new TestOutputLogger(Output)), "log-test"); logger.Tell(new InitializeLogger(system.EventStream)); } } diff --git a/src/contrib/transports/Akka.Remote.Transport.Helios/Akka.Remote.Transport.Helios.csproj b/src/contrib/transports/Akka.Remote.Transport.Helios/Akka.Remote.Transport.Helios.csproj new file mode 100644 index 00000000000..874b14f7b0d --- /dev/null +++ b/src/contrib/transports/Akka.Remote.Transport.Helios/Akka.Remote.Transport.Helios.csproj @@ -0,0 +1,84 @@ + + + + + Debug + AnyCPU + {29FEAABC-E326-450A-9008-B5FCECF0115F} + Library + Properties + Akka.Remote.Transport.Helios + Akka.Remote.Transport.Helios + v4.5 + 512 + + + true + full + false + bin\Debug\ + DEBUG;TRACE + prompt + 4 + + + pdbonly + true + bin\Release\ + TRACE + prompt + 4 + + + + ..\..\..\packages\Google.ProtocolBuffers.2.4.1.555\lib\net40\Google.ProtocolBuffers.dll + True + + + ..\..\..\packages\Google.ProtocolBuffers.2.4.1.555\lib\net40\Google.ProtocolBuffers.Serialization.dll + True + + + ..\..\..\packages\Helios.2.1.3\lib\net45\Helios.dll + True + + + + + + + + + + + + + + + + + + + + + + + + + {ea4ff8fd-7c53-49c8-b9aa-02e458b3e6a7} + Akka.Remote + + + {5deddf90-37f0-48d3-a0b0-a5cbd8a7e377} + Akka + + + + + \ No newline at end of file diff --git a/src/contrib/transports/Akka.Remote.Transport.Helios/Akka.Remote.Transport.Helios.nuspec b/src/contrib/transports/Akka.Remote.Transport.Helios/Akka.Remote.Transport.Helios.nuspec new file mode 100644 index 00000000000..6af67c1c832 --- /dev/null +++ b/src/contrib/transports/Akka.Remote.Transport.Helios/Akka.Remote.Transport.Helios.nuspec @@ -0,0 +1,20 @@ + + + + @project@ + @project@@title@ + @build.number@ + @authors@ + @authors@ + (Legacy) Akka.NET remote transport layer based on Helios. + https://github.com/akkadotnet/akka.net/blob/master/LICENSE + https://github.com/akkadotnet/akka.net + http://getakka.net/images/AkkaNetLogo.Normal.png + false + @releaseNotes@ + @copyright@ + @tags@ + @dependencies@ + @references@ + + diff --git a/src/core/Akka.Remote/Transport/Helios/HeliosHelpers.cs b/src/contrib/transports/Akka.Remote.Transport.Helios/HeliosHelpers.cs similarity index 87% rename from src/core/Akka.Remote/Transport/Helios/HeliosHelpers.cs rename to src/contrib/transports/Akka.Remote.Transport.Helios/HeliosHelpers.cs index 688c92985d5..e5239029c35 100644 --- a/src/core/Akka.Remote/Transport/Helios/HeliosHelpers.cs +++ b/src/contrib/transports/Akka.Remote.Transport.Helios/HeliosHelpers.cs @@ -1,24 +1,18 @@ -//----------------------------------------------------------------------- -// -// Copyright (C) 2009-2016 Lightbend Inc. -// Copyright (C) 2013-2016 Akka.NET project -// -//----------------------------------------------------------------------- +#region copyright +// ----------------------------------------------------------------------- +// +// Copyright (C) 2009-2016 Lightbend Inc. +// Copyright (C) 2013-2017 Akka.NET project +// +// ----------------------------------------------------------------------- +#endregion using System; using System.Net; using System.Threading.Tasks; using Akka.Actor; using Akka.Event; -using Google.ProtocolBuffers; -using Helios; -using Helios.Buffers; using Helios.Channels; -using Helios.Exceptions; -using Helios.Net; -using Helios.Ops; -using Helios.Serialization; -using Helios.Topology; namespace Akka.Remote.Transport.Helios { @@ -55,7 +49,7 @@ public override void ChannelActive(IChannelHandlerContext context) { if (!WrappedTransport.ConnectionGroup.TryAdd(context.Channel)) { - Log.Warning("Unable to REMOVE channel [{0}->{1}](Id={2}) to connection group. May not shut down cleanly.", + Log.Warning("Unable to REMOVE channel [{0}->{1}](Id={2}) to connection group. May not shut down cleanly.", context.Channel.LocalAddress, context.Channel.RemoteAddress, context.Channel.Id); } } @@ -137,4 +131,3 @@ protected void Init(IChannel channel, IPEndPoint remoteSocketAddress, Address re } } } - diff --git a/src/contrib/transports/Akka.Remote.Transport.Helios/HeliosTcpTransport.cs b/src/contrib/transports/Akka.Remote.Transport.Helios/HeliosTcpTransport.cs new file mode 100644 index 00000000000..242a1839c82 --- /dev/null +++ b/src/contrib/transports/Akka.Remote.Transport.Helios/HeliosTcpTransport.cs @@ -0,0 +1,318 @@ +#region copyright +// ----------------------------------------------------------------------- +// +// Copyright (C) 2009-2016 Lightbend Inc. +// Copyright (C) 2013-2017 Akka.NET project +// +// ----------------------------------------------------------------------- +#endregion + +using System; +using System.Net; +using System.Net.Sockets; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Configuration; +using Akka.Event; +using Google.ProtocolBuffers; +using Helios.Buffers; +using Helios.Channels; +using Helios.Exceptions; +using Helios.Util; + +namespace Akka.Remote.Transport.Helios +{ + /// + /// INTERNAL API + /// + abstract class TcpHandlers : CommonHandlers + { + private IHandleEventListener _listener; + + /// + /// TBD + /// + /// TBD + protected void NotifyListener(IHandleEvent msg) + { + _listener?.Notify(msg); + } + + /// + /// TBD + /// + /// TBD + /// TBD + protected TcpHandlers(HeliosTransport wrappedTransport, ILoggingAdapter log) : base(wrappedTransport, log) + { + } + + /// + /// TBD + /// + /// TBD + /// TBD + /// TBD + /// TBD + protected override void RegisterListener(IChannel channel, IHandleEventListener listener, object msg, IPEndPoint remoteAddress) + { + _listener = listener; + } + + /// + /// TBD + /// + /// TBD + /// TBD + /// TBD + /// TBD + protected override AssociationHandle CreateHandle(IChannel channel, Address localAddress, Address remoteAddress) + { + return new TcpAssociationHandle(localAddress, remoteAddress, WrappedTransport, channel); + } + + /// + /// TBD + /// + /// TBD + public override void ChannelInactive(IChannelHandlerContext context) + { + NotifyListener(new Disassociated(DisassociateInfo.Unknown)); + base.ChannelInactive(context); + } + + /// + /// TBD + /// + /// TBD + /// TBD + public override void ChannelRead(IChannelHandlerContext context, object message) + { + var buf = (IByteBuf)message; + if (buf.ReadableBytes > 0) + { + // no need to copy the byte buffer contents; ByteString does that automatically + var bytes = ByteString.CopyFrom(buf.Array, buf.ArrayOffset + buf.ReaderIndex, buf.ReadableBytes); + NotifyListener(new InboundPayload(bytes)); + } + + // decrease the reference count to 0 (releases buffer) + ReferenceCountUtil.SafeRelease(message); + } + + /// + /// TBD + /// + /// TBD + /// TBD + public override void ExceptionCaught(IChannelHandlerContext context, Exception exception) + { + var se = exception as SocketException; + if (se?.SocketErrorCode == SocketError.OperationAborted) + { + NotifyListener(new Disassociated(DisassociateInfo.Shutdown)); + } + else + { + base.ExceptionCaught(context, exception); + NotifyListener(new Disassociated(DisassociateInfo.Unknown)); + } + + context.CloseAsync(); // close the channel + } + } + + /// + /// TCP handlers for inbound connections + /// + internal sealed class TcpServerHandler : TcpHandlers + { + private readonly Task _associationEventListener; + + /// + /// TBD + /// + /// TBD + /// TBD + /// TBD + public TcpServerHandler(HeliosTransport wrappedTransport, ILoggingAdapter log, Task associationEventListener) : base(wrappedTransport, log) + { + _associationEventListener = associationEventListener; + } + + /// + /// TBD + /// + /// TBD + public override void ChannelActive(IChannelHandlerContext context) + { + InitInbound(context.Channel, (IPEndPoint)context.Channel.RemoteAddress, null); + base.ChannelActive(context); + } + + void InitInbound(IChannel channel, IPEndPoint socketAddress, object msg) + { + // disable automatic reads + channel.Configuration.AutoRead = false; + + _associationEventListener.ContinueWith(r => + { + var listener = r.Result; + var remoteAddress = HeliosTransport.MapSocketToAddress(socketAddress, WrappedTransport.SchemeIdentifier, + WrappedTransport.System.Name); + AssociationHandle handle; + Init(channel, socketAddress, remoteAddress, msg, out handle); + listener.Notify(new InboundAssociation(handle)); + }, TaskContinuationOptions.OnlyOnRanToCompletion); + } + } + + /// + /// TCP handlers for outbound connections + /// + class TcpClientHandler : TcpHandlers + { + /// + /// TBD + /// + protected readonly TaskCompletionSource StatusPromise = new TaskCompletionSource(); + private readonly Address _remoteAddress; + /// + /// TBD + /// + public Task StatusFuture { get { return StatusPromise.Task; } } + + /// + /// TBD + /// + /// TBD + /// TBD + /// TBD + public TcpClientHandler(HeliosTransport wrappedTransport, ILoggingAdapter log, Address remoteAddress) : base(wrappedTransport, log) + { + _remoteAddress = remoteAddress; + } + + /// + /// TBD + /// + /// TBD + public override void ChannelActive(IChannelHandlerContext context) + { + InitOutbound(context.Channel, (IPEndPoint)context.Channel.RemoteAddress, null); + base.ChannelActive(context); + } + + void InitOutbound(IChannel channel, IPEndPoint socketAddress, object msg) + { + AssociationHandle handle; + Init(channel, socketAddress, _remoteAddress, msg, out handle); + StatusPromise.TrySetResult(handle); + } + } + + /// + /// INTERNAL API + /// + class TcpAssociationHandle : AssociationHandle + { + private readonly IChannel _channel; + private HeliosTransport _transport; + + /// + /// TBD + /// + /// TBD + /// TBD + /// TBD + /// TBD + public TcpAssociationHandle(Address localAddress, Address remoteAddress, HeliosTransport transport, IChannel connection) + : base(localAddress, remoteAddress) + { + _channel = connection; + _transport = transport; + } + + /// + /// TBD + /// + /// TBD + /// TBD + public override bool Write(ByteString payload) + { + if (_channel.IsOpen && _channel.IsWritable) + { + _channel.WriteAndFlushAsync(Unpooled.WrappedBuffer(payload.ToByteArray())); + return true; + } + return false; + } + + /// + /// TBD + /// + public override void Disassociate() + { + _channel.CloseAsync(); + } + } + + /// + /// TCP implementation of a . + /// + /// + /// Due to the connection-oriented nature of TCP connections, this transport doesn't have to do any + /// additional bookkeeping when transports are disposed or opened. + /// + /// + class HeliosTcpTransport : HeliosTransport + { + /// + /// TBD + /// + /// TBD + /// TBD + public HeliosTcpTransport(ActorSystem system, Config config) + : base(system, config, "akka.remote.helios.tcp") + { + } + + /// + /// TBD + /// + /// TBD + /// TBD + /// TBD + protected override async Task AssociateInternal(Address remoteAddress) + { + try + { + var clientBootstrap = ClientFactory(remoteAddress); + var socketAddress = AddressToSocketAddress(remoteAddress); + + var associate = await clientBootstrap.ConnectAsync(socketAddress); + + var handler = (TcpClientHandler)associate.Pipeline.Last(); + return await handler.StatusFuture; + } + catch (AggregateException e) when (e.InnerException is ConnectException) + { + var heliosException = (ConnectException)e.InnerException; + var socketException = heliosException?.InnerException as SocketException; + + if (socketException?.SocketErrorCode == SocketError.ConnectionRefused) + { + throw new InvalidAssociationException(socketException.Message + " " + remoteAddress); + } + + throw new InvalidAssociationException("Failed to associate with " + remoteAddress, e); + } + catch (AggregateException e) when (e.InnerException is ConnectTimeoutException) + { + var heliosException = (ConnectTimeoutException)e.InnerException; + + throw new InvalidAssociationException(heliosException.Message); + } + } + } +} diff --git a/src/core/Akka.Remote/Transport/Helios/HeliosTransport.cs b/src/contrib/transports/Akka.Remote.Transport.Helios/HeliosTransport.cs similarity index 72% rename from src/core/Akka.Remote/Transport/Helios/HeliosTransport.cs rename to src/contrib/transports/Akka.Remote.Transport.Helios/HeliosTransport.cs index f0d9f66a1c3..ae38a67f925 100644 --- a/src/core/Akka.Remote/Transport/Helios/HeliosTransport.cs +++ b/src/contrib/transports/Akka.Remote.Transport.Helios/HeliosTransport.cs @@ -1,19 +1,19 @@ -//----------------------------------------------------------------------- -// -// Copyright (C) 2009-2016 Lightbend Inc. -// Copyright (C) 2013-2016 Akka.NET project -// -//----------------------------------------------------------------------- +#region copyright +// ----------------------------------------------------------------------- +// +// Copyright (C) 2009-2016 Lightbend Inc. +// Copyright (C) 2013-2017 Akka.NET project +// +// ----------------------------------------------------------------------- +#endregion using System; -using System.CodeDom.Compiler; using System.Collections.Generic; using System.Net; using System.Net.Sockets; using System.Threading.Tasks; using Akka.Actor; using Akka.Configuration; -using Akka.Dispatch; using Akka.Event; using Akka.Util; using Helios.Channels; @@ -21,8 +21,6 @@ using Helios.Channels.Sockets; using Helios.Codecs; using Helios.Exceptions; -using Helios.Logging; -using Helios.Topology; using Helios.Util.Concurrency; using AtomicCounter = Helios.Util.AtomicCounter; using LengthFieldPrepender = Helios.Codecs.LengthFieldPrepender; @@ -79,193 +77,16 @@ public TcpTransportException(string message, Exception cause = null) : base(mess } } - /// - /// TBD - /// - internal class HeliosTransportSettings - { - /// - /// TBD - /// - internal readonly Config Config; - - /// - /// TBD - /// - /// TBD - public HeliosTransportSettings(Config config) - { - Config = config; - Init(); - } - static HeliosTransportSettings() - { - // Disable STDOUT logging for Helios in release mode -#if !DEBUG - LoggingFactory.DefaultFactory = new NoOpLoggerFactory(); -#endif - - } - - private void Init() - { - //TransportMode - var protocolString = Config.GetString("transport-protocol"); - if (protocolString.Equals("tcp")) TransportMode = new Tcp(); - else if (protocolString.Equals("udp")) TransportMode = new Udp(); - else throw new ConfigurationException(string.Format("Unknown transport transport-protocol='{0}'", protocolString)); - EnableSsl = Config.GetBoolean("enable-ssl"); - ConnectTimeout = Config.GetTimeSpan("connection-timeout"); - WriteBufferHighWaterMark = OptionSize("write-buffer-high-water-mark"); - WriteBufferLowWaterMark = OptionSize("write-buffer-low-water-mark"); - SendBufferSize = OptionSize("send-buffer-size"); - ReceiveBufferSize = OptionSize("receive-buffer-size"); - var size = OptionSize("maximum-frame-size"); - if (size == null || size < 32000) throw new ConfigurationException("Setting 'maximum-frame-size' must be at least 32000 bytes"); - MaxFrameSize = (int)size; - Backlog = Config.GetInt("backlog"); - TcpNoDelay = Config.GetBoolean("tcp-nodelay"); - TcpKeepAlive = Config.GetBoolean("tcp-keepalive"); - TcpReuseAddr = Config.GetBoolean("tcp-reuse-addr"); - var configHost = Config.GetString("hostname"); - var publicConfigHost = Config.GetString("public-hostname"); - DnsUseIpv6 = Config.GetBoolean("dns-use-ipv6"); - EnforceIpFamily = RuntimeDetector.IsMono || Config.GetBoolean("enforce-ip-family"); - Hostname = string.IsNullOrEmpty(configHost) ? IPAddress.Any.ToString() : configHost; - PublicHostname = string.IsNullOrEmpty(publicConfigHost) ? Hostname : publicConfigHost; - ServerSocketWorkerPoolSize = ComputeWps(Config.GetConfig("server-socket-worker-pool")); - ClientSocketWorkerPoolSize = ComputeWps(Config.GetConfig("client-socket-worker-pool")); - Port = Config.GetInt("port"); - - // used to provide backwards compatibility with Helios 1.* clients - BackwardsCompatibilityModeEnabled = Config.GetBoolean("enable-backwards-compatibility", false); - } - - /// - /// TBD - /// - public TransportMode TransportMode { get; private set; } - - /// - /// TBD - /// - public bool EnableSsl { get; private set; } - - /// - /// TBD - /// - public TimeSpan ConnectTimeout { get; private set; } - - /// - /// TBD - /// - public long? WriteBufferHighWaterMark { get; private set; } - - /// - /// TBD - /// - public long? WriteBufferLowWaterMark { get; private set; } - - /// - /// TBD - /// - public long? SendBufferSize { get; private set; } - - /// - /// TBD - /// - public long? ReceiveBufferSize { get; private set; } - - /// - /// TBD - /// - public int MaxFrameSize { get; private set; } - - /// - /// TBD - /// - public int Port { get; private set; } - - /// - /// TBD - /// - public int Backlog { get; private set; } - - /// - /// TBD - /// - public bool TcpNoDelay { get; private set; } - - /// - /// TBD - /// - public bool TcpKeepAlive { get; private set; } - - /// - /// TBD - /// - public bool TcpReuseAddr { get; private set; } - - /// - /// TBD - /// - public bool DnsUseIpv6 { get; private set; } - - /// - /// TBD - /// - public bool EnforceIpFamily { get; private set; } - - /// - /// The hostname that this server binds to - /// - public string Hostname { get; private set; } - - /// - /// If different from , this is the public "address" that is bound to the , - /// whereas becomes the physical address that the low-level socket connects to. - /// - public string PublicHostname { get; private set; } - - /// - /// TBD - /// - public int ServerSocketWorkerPoolSize { get; private set; } - - /// - /// TBD - /// - public int ClientSocketWorkerPoolSize { get; private set; } - - /// - /// TBD - /// - public bool BackwardsCompatibilityModeEnabled { get; private set; } - -#region Internal methods - - private long? OptionSize(string s) - { - var bytes = Config.GetByteSize(s); - if (bytes == null || bytes == 0) return null; - if (bytes < 0) throw new ConfigurationException(string.Format("Setting {0} must be 0 or positive", s)); - return bytes; - } - - private int ComputeWps(Config config) - { - return ThreadPoolConfig.ScaledPoolSize(config.GetInt("pool-size-min"), config.GetDouble("pool-size-factor"), - config.GetInt("pool-size-max")); - } - -#endregion - } - /// /// Abstract base class for HeliosTransport - has separate child implementations for TCP / UDP respectively /// abstract class HeliosTransport : Transport { + internal static Config DefaultConfig() + { + var config = ConfigurationFactory.FromResource("Akka.Remote.Transport.Helios.remote.conf"); + return config; + } private readonly IEventLoopGroup _serverEventLoopGroup; private readonly IEventLoopGroup _clientEventLoopGroup; @@ -275,11 +96,12 @@ abstract class HeliosTransport : Transport /// /// TBD /// TBD - protected HeliosTransport(ActorSystem system, Config config) + protected HeliosTransport(ActorSystem system, Config config, string fallbackConfigPath) { Config = config; System = system; - Settings = new HeliosTransportSettings(config); + var fallbackConfig = DefaultConfig(); + Settings = new HeliosTransportSettings(config.WithFallback(fallbackConfig.GetConfig(fallbackConfigPath))); Log = Logging.GetLogger(System, GetType()); _serverEventLoopGroup = new MultithreadEventLoopGroup(Settings.ServerSocketWorkerPoolSize); _clientEventLoopGroup = new MultithreadEventLoopGroup(Settings.ClientSocketWorkerPoolSize); @@ -413,8 +235,8 @@ protected ClientBootstrap ClientFactory(Address remoteAddres) .Option(ChannelOption.ConnectTimeout, Settings.ConnectTimeout) .Option(ChannelOption.AutoRead, false) .PreferredDnsResolutionFamily(addressFamily) - .ChannelFactory(() => Settings.EnforceIpFamily ? - new TcpSocketChannel(addressFamily): + .ChannelFactory(() => Settings.EnforceIpFamily ? + new TcpSocketChannel(addressFamily) : new TcpSocketChannel()) .Handler( new ActionChannelInitializer( @@ -461,8 +283,8 @@ protected ServerBootstrap ServerFactory .ChildOption(ChannelOption.AutoRead, false) .Option(ChannelOption.SoBacklog, Settings.Backlog) .PreferredDnsResolutionFamily(addressFamily) - .ChannelFactory(() => Settings.EnforceIpFamily ? - new TcpServerSocketChannel(addressFamily): + .ChannelFactory(() => Settings.EnforceIpFamily ? + new TcpServerSocketChannel(addressFamily) : new TcpServerSocketChannel()) .ChildHandler( new ActionChannelInitializer( @@ -492,7 +314,7 @@ protected async Task NewServer(EndPoint listenAddress) { return await ServerFactory.BindAsync(listenAddress).ConfigureAwait(false); } - + else throw new NotImplementedException("Haven't implemented UDP transport at this time"); } @@ -620,7 +442,7 @@ public override async Task Shutdown() /// TBD public static EndPoint AddressToSocketAddress(Address address) { - if(address.Port == null) throw new ArgumentException($"address port must not be null: {address}"); + if (address.Port == null) throw new ArgumentException($"address port must not be null: {address}"); EndPoint listenAddress; IPAddress ip; if (IPAddress.TryParse(address.Host, out ip)) @@ -684,7 +506,6 @@ public static string SafeMapIPv6(IPAddress address) return address.ToString(); } -#endregion + #endregion } -} - +} \ No newline at end of file diff --git a/src/contrib/transports/Akka.Remote.Transport.Helios/HeliosTransportSettings.cs b/src/contrib/transports/Akka.Remote.Transport.Helios/HeliosTransportSettings.cs new file mode 100644 index 00000000000..ab10faed44a --- /dev/null +++ b/src/contrib/transports/Akka.Remote.Transport.Helios/HeliosTransportSettings.cs @@ -0,0 +1,200 @@ +#region copyright +// ----------------------------------------------------------------------- +// +// Copyright (C) 2009-2016 Lightbend Inc. +// Copyright (C) 2013-2017 Akka.NET project +// +// ----------------------------------------------------------------------- +#endregion + +using System; +using System.Net; +using Akka.Actor; +using Akka.Configuration; +using Akka.Dispatch; +using Akka.Util; +using Helios.Logging; + +namespace Akka.Remote.Transport.Helios +{ + /// + /// TBD + /// + internal class HeliosTransportSettings + { + /// + /// TBD + /// + internal readonly Config Config; + + /// + /// TBD + /// + /// TBD + public HeliosTransportSettings(Config config) + { + Config = config; + Init(); + } + static HeliosTransportSettings() + { + // Disable STDOUT logging for Helios in release mode +#if !DEBUG + LoggingFactory.DefaultFactory = new NoOpLoggerFactory(); +#endif + } + + private void Init() + { + //TransportMode + var protocolString = Config.GetString("transport-protocol"); + if (protocolString.Equals("tcp")) TransportMode = new Tcp(); + else if (protocolString.Equals("udp")) TransportMode = new Udp(); + else throw new ConfigurationException(string.Format("Unknown transport transport-protocol='{0}'", protocolString)); + EnableSsl = Config.GetBoolean("enable-ssl"); + ConnectTimeout = Config.GetTimeSpan("connection-timeout"); + WriteBufferHighWaterMark = OptionSize("write-buffer-high-water-mark"); + WriteBufferLowWaterMark = OptionSize("write-buffer-low-water-mark"); + SendBufferSize = OptionSize("send-buffer-size"); + ReceiveBufferSize = OptionSize("receive-buffer-size"); + var size = OptionSize("maximum-frame-size"); + if (size == null || size < 32000) throw new ConfigurationException("Setting 'maximum-frame-size' must be at least 32000 bytes"); + MaxFrameSize = (int)size; + Backlog = Config.GetInt("backlog"); + TcpNoDelay = Config.GetBoolean("tcp-nodelay"); + TcpKeepAlive = Config.GetBoolean("tcp-keepalive"); + TcpReuseAddr = Config.GetBoolean("tcp-reuse-addr"); + var configHost = Config.GetString("hostname"); + var publicConfigHost = Config.GetString("public-hostname"); + DnsUseIpv6 = Config.GetBoolean("dns-use-ipv6"); + EnforceIpFamily = RuntimeDetector.IsMono || Config.GetBoolean("enforce-ip-family"); + Hostname = string.IsNullOrEmpty(configHost) ? IPAddress.Any.ToString() : configHost; + PublicHostname = string.IsNullOrEmpty(publicConfigHost) ? Hostname : publicConfigHost; + ServerSocketWorkerPoolSize = ComputeWps(Config.GetConfig("server-socket-worker-pool")); + ClientSocketWorkerPoolSize = ComputeWps(Config.GetConfig("client-socket-worker-pool")); + Port = Config.GetInt("port"); + + // used to provide backwards compatibility with Helios 1.* clients + BackwardsCompatibilityModeEnabled = Config.GetBoolean("enable-backwards-compatibility", false); + } + + /// + /// TBD + /// + public TransportMode TransportMode { get; private set; } + + /// + /// TBD + /// + public bool EnableSsl { get; private set; } + + /// + /// TBD + /// + public TimeSpan ConnectTimeout { get; private set; } + + /// + /// TBD + /// + public long? WriteBufferHighWaterMark { get; private set; } + + /// + /// TBD + /// + public long? WriteBufferLowWaterMark { get; private set; } + + /// + /// TBD + /// + public long? SendBufferSize { get; private set; } + + /// + /// TBD + /// + public long? ReceiveBufferSize { get; private set; } + + /// + /// TBD + /// + public int MaxFrameSize { get; private set; } + + /// + /// TBD + /// + public int Port { get; private set; } + + /// + /// TBD + /// + public int Backlog { get; private set; } + + /// + /// TBD + /// + public bool TcpNoDelay { get; private set; } + + /// + /// TBD + /// + public bool TcpKeepAlive { get; private set; } + + /// + /// TBD + /// + public bool TcpReuseAddr { get; private set; } + + /// + /// TBD + /// + public bool DnsUseIpv6 { get; private set; } + + /// + /// TBD + /// + public bool EnforceIpFamily { get; private set; } + + /// + /// The hostname that this server binds to + /// + public string Hostname { get; private set; } + + /// + /// If different from , this is the public "address" that is bound to the , + /// whereas becomes the physical address that the low-level socket connects to. + /// + public string PublicHostname { get; private set; } + + /// + /// TBD + /// + public int ServerSocketWorkerPoolSize { get; private set; } + + /// + /// TBD + /// + public int ClientSocketWorkerPoolSize { get; private set; } + + /// + /// TBD + /// + public bool BackwardsCompatibilityModeEnabled { get; private set; } + + #region Internal methods + + private long? OptionSize(string s) + { + var bytes = Config.GetByteSize(s); + if (bytes == null || bytes == 0) return null; + if (bytes < 0) throw new ConfigurationException(string.Format("Setting {0} must be 0 or positive", s)); + return bytes; + } + + private int ComputeWps(Config config) + { + return ThreadPoolConfig.ScaledPoolSize(config.GetInt("pool-size-min"), config.GetDouble("pool-size-factor"), + config.GetInt("pool-size-max")); + } + + #endregion + } +} \ No newline at end of file diff --git a/src/contrib/transports/Akka.Remote.Transport.Helios/Properties/AssemblyInfo.cs b/src/contrib/transports/Akka.Remote.Transport.Helios/Properties/AssemblyInfo.cs new file mode 100644 index 00000000000..8dd7b700e20 --- /dev/null +++ b/src/contrib/transports/Akka.Remote.Transport.Helios/Properties/AssemblyInfo.cs @@ -0,0 +1,36 @@ +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("Akka.Transports.Helios")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("Akka.Transports.Helios")] +[assembly: AssemblyCopyright("Copyright © 2017")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("29feaabc-e326-450a-9008-b5fcecf0115f")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/src/contrib/transports/Akka.Remote.Transport.Helios/packages.config b/src/contrib/transports/Akka.Remote.Transport.Helios/packages.config new file mode 100644 index 00000000000..34f6575afe9 --- /dev/null +++ b/src/contrib/transports/Akka.Remote.Transport.Helios/packages.config @@ -0,0 +1,5 @@ + + + + + \ No newline at end of file diff --git a/src/contrib/transports/Akka.Remote.Transport.Helios/remote.conf b/src/contrib/transports/Akka.Remote.Transport.Helios/remote.conf new file mode 100644 index 00000000000..f236db910ba --- /dev/null +++ b/src/contrib/transports/Akka.Remote.Transport.Helios/remote.conf @@ -0,0 +1,142 @@ +akka.remote.helios { + tcp { + # The class given here must implement the akka.remote.transport.Transport + # interface and offer a public constructor which takes two arguments: + # 1) akka.actor.ExtendedActorSystem + # 2) com.typesafe.config.Config + transport-class = "Akka.Remote.Transport.Helios.HeliosTcpTransport, Akka.Remote.Transport.Helios" + + # Transport drivers can be augmented with adapters by adding their + # name to the applied-adapters list. The last adapter in the + # list is the adapter immediately above the driver, while + # the first one is the top of the stack below the standard + # Akka protocol + applied-adapters = [] + + transport-protocol = tcp + + # The default remote server port clients should connect to. + # Default is 2552 (AKKA), use 0 if you want a random available port + # This port needs to be unique for each actor system on the same machine. + port = 2552 + + # The hostname or ip to bind the remoting to, + # InetAddress.getLocalHost.getHostAddress is used if empty + hostname = "" + + # If this value is set, this becomes the public address for the actor system on this + # transport, which might be different than the physical ip address (hostname) + # this is designed to make it easy to support private / public addressing schemes + public-hostname = "" + + # If set to true, we will use IPV6 addresses upon DNS resolution for host names. + # Otherwise, we will use IPV4. + dns-use-ipv6 = false + + # If set to true, we will enforce usage of IPV4 or IPV6 addresses upon DNS resolution for host names. + # If dns-use-ipv6 = true, we will use IPV6 enforcement + # Otherwise, we will use IPV4. + # Warning: when ip family is enforced, any connection between IPV4 and IPV6 is impossible + # + # enforce-ip-family setting is used only in some special cases, when default behaviour of + # underlying sockets leads to errors. Typically this occurs when an environment doesn't support + # IPV6 or dual-mode sockets. + # As of 09/21/2016 there are two known cases: running under Mono and in Azure WebApp + # for them we will need enforce-ip-family = true, and for Azure dns-use-ipv6 = false + # This property is always set to true if Mono runtime is detected. + + enforce-ip-family = false + + # Enables SSL support on this transport + enable-ssl = false + + # Enables backwards compatibility with Akka.Remote clients running Helios 1.* + enable-backwards-compatibility = false + + # Sets the connectTimeoutMillis of all outbound connections, + # i.e. how long a connect may take until it is timed out + connection-timeout = 15 s + + # If set to "" then the specified dispatcher + # will be used to accept inbound connections, and perform IO. If "" then + # dedicated threads will be used. + # Please note that the Helios driver only uses this configuration and does + # not read the "akka.remote.use-dispatcher" entry. Instead it has to be + # configured manually to point to the same dispatcher if needed. + use-dispatcher-for-io = "" + + # Sets the high water mark for the in and outbound sockets, + # set to 0b for platform default + write-buffer-high-water-mark = 0b + + # Sets the low water mark for the in and outbound sockets, + # set to 0b for platform default + write-buffer-low-water-mark = 0b + + # Sets the send buffer size of the Sockets, + # set to 0b for platform default + send-buffer-size = 256000b + + # Sets the receive buffer size of the Sockets, + # set to 0b for platform default + receive-buffer-size = 256000b + + # Maximum message size the transport will accept, but at least + # 32000 bytes. + # Please note that UDP does not support arbitrary large datagrams, + # so this setting has to be chosen carefully when using UDP. + # Both send-buffer-size and receive-buffer-size settings has to + # be adjusted to be able to buffer messages of maximum size. + maximum-frame-size = 128000b + + # Sets the size of the connection backlog + backlog = 4096 + + # Enables the TCP_NODELAY flag, i.e. disables Nagle’s algorithm + tcp-nodelay = on + + # Enables TCP Keepalive, subject to the O/S kernel’s configuration + tcp-keepalive = on + + # Enables SO_REUSEADDR, which determines when an ActorSystem can open + # the specified listen port (the meaning differs between *nix and Windows) + # Valid values are "on", "off" and "off-for-windows" + # due to the following Windows bug: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4476378 + # "off-for-windows" of course means that it's "on" for all other platforms + tcp-reuse-addr = on + + # Used to configure the number of I/O worker threads on server sockets + server-socket-worker-pool { + # Min number of threads to cap factor-based number to + pool-size-min = 2 + + # The pool size factor is used to determine thread pool size + # using the following formula: ceil(available processors * factor). + # Resulting size is then bounded by the pool-size-min and + # pool-size-max values. + pool-size-factor = 1.0 + + # Max number of threads to cap factor-based number to + pool-size-max = 2 + } + + # Used to configure the number of I/O worker threads on client sockets + client-socket-worker-pool { + # Min number of threads to cap factor-based number to + pool-size-min = 2 + + # The pool size factor is used to determine thread pool size + # using the following formula: ceil(available processors * factor). + # Resulting size is then bounded by the pool-size-min and + # pool-size-max values. + pool-size-factor = 1.0 + + # Max number of threads to cap factor-based number to + pool-size-max = 2 + } + } + udp = ${akka.remote.helios.tcp} + udp { + transport-protocol = udp + } +} \ No newline at end of file diff --git a/src/core/Akka.Cluster.Tests.MultiNode/QuickRestartSpec.cs b/src/core/Akka.Cluster.Tests.MultiNode/QuickRestartSpec.cs index 4d844a25f55..ef9e4851c9f 100644 --- a/src/core/Akka.Cluster.Tests.MultiNode/QuickRestartSpec.cs +++ b/src/core/Akka.Cluster.Tests.MultiNode/QuickRestartSpec.cs @@ -96,7 +96,7 @@ private void JoinAndRestart() .WithFallback(Sys.Settings.Config)) : ActorSystem.Create(Sys.Name, ConfigurationFactory.ParseString($"akka.cluster.roles=[round-{round}]") .WithFallback( - $"akka.remote.helios.tcp.port={Cluster.Get(restartingSystem).SelfAddress.Port}") + $"akka.remote.dot-netty.tcp.port={Cluster.Get(restartingSystem).SelfAddress.Port}") .WithFallback(Sys.Settings.Config)); Log.Info("Restarting node has address {0}", Cluster.Get(restartingSystem).SelfUniqueAddress); Cluster.Get(restartingSystem).JoinSeedNodes(_seedNodes.Value); diff --git a/src/core/Akka.Cluster.Tests.MultiNode/RestartFirstSeedNodeSpec.cs b/src/core/Akka.Cluster.Tests.MultiNode/RestartFirstSeedNodeSpec.cs index 8a0c895233b..8f30312f4f2 100644 --- a/src/core/Akka.Cluster.Tests.MultiNode/RestartFirstSeedNodeSpec.cs +++ b/src/core/Akka.Cluster.Tests.MultiNode/RestartFirstSeedNodeSpec.cs @@ -69,7 +69,7 @@ public RestartFirstSeedNodeSpec(RestartFirstSeedNodeSpecConfig config) : base(co restartedSeed1System = new Lazy(() => { var localConfig = ConfigurationFactory - .ParseString("akka.remote.helios.tcp.port=" + GetSeedNodes().First().Port) + .ParseString("akka.remote.dot-netty.tcp.port=" + GetSeedNodes().First().Port) .WithFallback(Sys.Settings.Config); return ActorSystem.Create(Sys.Name, localConfig); }); diff --git a/src/core/Akka.Cluster.Tests.MultiNode/RestartNode3Spec.cs b/src/core/Akka.Cluster.Tests.MultiNode/RestartNode3Spec.cs index bc006a79f68..26436e99eb4 100644 --- a/src/core/Akka.Cluster.Tests.MultiNode/RestartNode3Spec.cs +++ b/src/core/Akka.Cluster.Tests.MultiNode/RestartNode3Spec.cs @@ -72,7 +72,7 @@ protected RestartNode3Spec(RestartNode3SpecConfig config) : base(config) secondSystem = new Lazy(() => ActorSystem.Create(Sys.Name, Sys.Settings.Config)); restartedSecondSystem = new Lazy(() => ActorSystem.Create( Sys.Name, - ConfigurationFactory.ParseString("akka.remote.helios.tcp.port=" + secondUniqueAddress.Address.Port) + ConfigurationFactory.ParseString("akka.remote.dot-netty.tcp.port=" + secondUniqueAddress.Address.Port) .WithFallback(Sys.Settings.Config))); } diff --git a/src/core/Akka.Cluster.Tests.MultiNode/RestartNodeSpec.cs b/src/core/Akka.Cluster.Tests.MultiNode/RestartNodeSpec.cs index aa1b7a72a56..89de367c49b 100644 --- a/src/core/Akka.Cluster.Tests.MultiNode/RestartNodeSpec.cs +++ b/src/core/Akka.Cluster.Tests.MultiNode/RestartNodeSpec.cs @@ -64,7 +64,7 @@ protected RestartNodeSpec(RestartNodeSpecConfig config) : base(config) { _config = config; _secondSystem = new Lazy(() => ActorSystem.Create(Sys.Name, Sys.Settings.Config)); - _secondRestartedSystem = new Lazy(() => ActorSystem.Create(Sys.Name, ConfigurationFactory.ParseString("akka.remote.helios.tcp.port=" + _secondUniqueAddress.Address.Port) + _secondRestartedSystem = new Lazy(() => ActorSystem.Create(Sys.Name, ConfigurationFactory.ParseString("akka.remote.dot-netty.tcp.port=" + _secondUniqueAddress.Address.Port) .WithFallback(Sys.Settings.Config))); } diff --git a/src/core/Akka.Cluster.Tests.MultiNode/UnreachableNodeJoinsAgainSpec.cs b/src/core/Akka.Cluster.Tests.MultiNode/UnreachableNodeJoinsAgainSpec.cs index 5af8d850020..b349c4512dd 100644 --- a/src/core/Akka.Cluster.Tests.MultiNode/UnreachableNodeJoinsAgainSpec.cs +++ b/src/core/Akka.Cluster.Tests.MultiNode/UnreachableNodeJoinsAgainSpec.cs @@ -214,7 +214,7 @@ public void AllowFreshNodeWithSameHostAndPortToJoinAgainWhenTheNetworkIsPluggedB Sys.WhenTerminated.Wait(TimeSpan.FromSeconds(10)); // create new ActorSystem with same host:port - var freshSystem = ActorSystem.Create(Sys.Name, ConfigurationFactory.ParseString(@"akka.remote.helios.tcp{ + var freshSystem = ActorSystem.Create(Sys.Name, ConfigurationFactory.ParseString(@"akka.remote.dot-netty.tcp{ hostname = "+ victimAddress.Host + @" port = "+ victimAddress.Port + @" }").WithFallback(Sys.Settings.Config)); diff --git a/src/core/Akka.Cluster.Tests/ClusterDeployerSpec.cs b/src/core/Akka.Cluster.Tests/ClusterDeployerSpec.cs index c6558e67799..9996082bbbd 100644 --- a/src/core/Akka.Cluster.Tests/ClusterDeployerSpec.cs +++ b/src/core/Akka.Cluster.Tests/ClusterDeployerSpec.cs @@ -50,7 +50,7 @@ public class ClusterDeployerSpec : AkkaSpec cluster.use-role = backend } } - akka.remote.helios.tcp.port = 0"); + akka.remote.dot-netty.tcp.port = 0"); public ClusterDeployerSpec() : base(deployerConf) { } diff --git a/src/core/Akka.Cluster.Tests/ClusterDomainEventPublisherSpec.cs b/src/core/Akka.Cluster.Tests/ClusterDomainEventPublisherSpec.cs index f73c7abf144..ee5cf50d1ec 100644 --- a/src/core/Akka.Cluster.Tests/ClusterDomainEventPublisherSpec.cs +++ b/src/core/Akka.Cluster.Tests/ClusterDomainEventPublisherSpec.cs @@ -18,7 +18,7 @@ public class ClusterDomainEventPublisherSpec : AkkaSpec { const string Config = @" akka.actor.provider = ""Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"" - akka.remote.helios.tcp.port = 0"; + akka.remote.dot-netty.tcp.port = 0"; readonly IActorRef _publisher; static readonly Member aUp = TestMember.Create(new Address("akka.tcp", "sys", "a", 2552), MemberStatus.Up); diff --git a/src/core/Akka.Cluster.Tests/ClusterSpec.cs b/src/core/Akka.Cluster.Tests/ClusterSpec.cs index fb8985b0902..16d82a25553 100644 --- a/src/core/Akka.Cluster.Tests/ClusterSpec.cs +++ b/src/core/Akka.Cluster.Tests/ClusterSpec.cs @@ -27,7 +27,7 @@ public class ClusterSpec : AkkaSpec } akka.actor.provider = ""Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"" akka.remote.log-remote-lifecycle-events = off - akka.remote.helios.tcp.port = 0"; + akka.remote.dot-netty.tcp.port = 0"; public IActorRef Self { get { return TestActor; } } @@ -196,7 +196,7 @@ public void A_cluster_must_complete_LeaveAsync_task_upon_being_removed() public void A_cluster_must_be_allowed_to_join_and_leave_with_local_address() { var sys2 = ActorSystem.Create("ClusterSpec2", ConfigurationFactory.ParseString(@"akka.actor.provider = ""Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"" - akka.remote.helios.tcp.port = 0")); + akka.remote.dot-netty.tcp.port = 0")); try { diff --git a/src/core/Akka.Cluster.Tests/Routing/ClusterRouterAsk1343BugFixSpec.cs b/src/core/Akka.Cluster.Tests/Routing/ClusterRouterAsk1343BugFixSpec.cs index c4e929cee14..308735aedad 100644 --- a/src/core/Akka.Cluster.Tests/Routing/ClusterRouterAsk1343BugFixSpec.cs +++ b/src/core/Akka.Cluster.Tests/Routing/ClusterRouterAsk1343BugFixSpec.cs @@ -63,7 +63,7 @@ public ClusterRouterAsk1343BugFixSpec() } } - remote.helios.tcp.port = 0 + remote.dot-netty.tcp.port = 0 }") { } diff --git a/src/core/Akka.Cluster.Tests/Routing/ClusterRouterSupervisorSpec.cs b/src/core/Akka.Cluster.Tests/Routing/ClusterRouterSupervisorSpec.cs index 8150375f086..61e43a256b0 100644 --- a/src/core/Akka.Cluster.Tests/Routing/ClusterRouterSupervisorSpec.cs +++ b/src/core/Akka.Cluster.Tests/Routing/ClusterRouterSupervisorSpec.cs @@ -20,7 +20,7 @@ public class ClusterRouterSupervisorSpec : AkkaSpec public ClusterRouterSupervisorSpec() : base(@" akka.actor.provider = ""Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"" - akka.remote.helios.tcp.port = 0") + akka.remote.dot-netty.tcp.port = 0") { } diff --git a/src/core/Akka.Cluster.Tests/StartupWithOneThreadSpec.cs b/src/core/Akka.Cluster.Tests/StartupWithOneThreadSpec.cs index ecfa78ce93f..ab445d505c2 100644 --- a/src/core/Akka.Cluster.Tests/StartupWithOneThreadSpec.cs +++ b/src/core/Akka.Cluster.Tests/StartupWithOneThreadSpec.cs @@ -24,7 +24,7 @@ public class StartupWithOneThreadSpec : AkkaSpec akka.actor.default-dispatcher.Type = ForkJoinDispatcher akka.actor.default-dispatcher.dedicated-thread-pool.thread-count = 1 akka.actor.provider = ""Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"" - akka.remote.helios.tcp.port = 0 + akka.remote.dot-netty.tcp.port = 0 "); private long _startTime; diff --git a/src/core/Akka.FSharp.Tests/Akka.FSharp.Tests.fsproj b/src/core/Akka.FSharp.Tests/Akka.FSharp.Tests.fsproj index 1b9bf799bbe..cd16d672ddc 100644 --- a/src/core/Akka.FSharp.Tests/Akka.FSharp.Tests.fsproj +++ b/src/core/Akka.FSharp.Tests/Akka.FSharp.Tests.fsproj @@ -73,6 +73,7 @@ ..\..\packages\FsCheck.Xunit.0.4.1.0\lib\net40-Client\FsCheck.Xunit.dll True + @@ -99,9 +100,6 @@ Akka {5DEDDF90-37F0-48D3-A0B0-A5CBD8A7E377} - - True - ..\..\packages\xunit.abstractions.2.0.0\lib\net35\xunit.abstractions.dll True diff --git a/src/core/Akka.FSharp.Tests/ApiTests.fs b/src/core/Akka.FSharp.Tests/ApiTests.fs index cad3c27dc4b..73655a66238 100644 --- a/src/core/Akka.FSharp.Tests/ApiTests.fs +++ b/src/core/Akka.FSharp.Tests/ApiTests.fs @@ -52,7 +52,7 @@ type TestUnion2 = // } // } // remote { -// helios.tcp { +// dot-netty.tcp { // port = %i // hostname = localhost // } diff --git a/src/core/Akka.FSharp.Tests/app.config b/src/core/Akka.FSharp.Tests/app.config index d3bd7964d53..9ca771f2452 100644 --- a/src/core/Akka.FSharp.Tests/app.config +++ b/src/core/Akka.FSharp.Tests/app.config @@ -18,7 +18,7 @@ - + diff --git a/src/core/Akka.FSharp/Akka.FSharp.fsproj b/src/core/Akka.FSharp/Akka.FSharp.fsproj index 23ef6d18417..7b0f4d6ba38 100644 --- a/src/core/Akka.FSharp/Akka.FSharp.fsproj +++ b/src/core/Akka.FSharp/Akka.FSharp.fsproj @@ -78,6 +78,7 @@ + ..\..\packages\FSPowerPack.Core.Community.3.0.0.0\Lib\Net40\FSharp.PowerPack.dll @@ -96,9 +97,6 @@ Akka {5DEDDF90-37F0-48D3-A0B0-A5CBD8A7E377} - - True - diff --git a/src/core/Akka.FSharp/Properties/AssemblyInfo.fs b/src/core/Akka.FSharp/Properties/AssemblyInfo.fs index 1eacd3677ba..a264e5e242b 100644 --- a/src/core/Akka.FSharp/Properties/AssemblyInfo.fs +++ b/src/core/Akka.FSharp/Properties/AssemblyInfo.fs @@ -10,9 +10,9 @@ open System.Runtime.InteropServices [] [] [] -[] -[] +[] +[] do () module internal AssemblyVersionInformation = - let [] Version = "1.1.2.0" + let [] Version = "1.1.3.0" diff --git a/src/core/Akka.FSharp/app.config b/src/core/Akka.FSharp/app.config index bb9b825d392..d95ace21f25 100644 --- a/src/core/Akka.FSharp/app.config +++ b/src/core/Akka.FSharp/app.config @@ -4,7 +4,7 @@ - + diff --git a/src/core/Akka.Persistence.FSharp/Akka.Persistence.FSharp.fsproj b/src/core/Akka.Persistence.FSharp/Akka.Persistence.FSharp.fsproj index 8ac45b5fa8c..40b6574679e 100644 --- a/src/core/Akka.Persistence.FSharp/Akka.Persistence.FSharp.fsproj +++ b/src/core/Akka.Persistence.FSharp/Akka.Persistence.FSharp.fsproj @@ -54,6 +54,7 @@ + @@ -70,9 +71,6 @@ Akka {5DEDDF90-37F0-48D3-A0B0-A5CBD8A7E377} - - True -