Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Akka.Remote socket leak fixes #3764

Merged
merged 37 commits into from
Apr 30, 2019
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
f7f118e
cleaning up comments and code
Aaronontheweb Apr 16, 2019
66efd08
cleaned up some EndpointManager code
Aaronontheweb Apr 24, 2019
1458101
working on porting relevant spec
Aaronontheweb Apr 24, 2019
58f0252
adding some missing test methods to RemotingSpec
Aaronontheweb Apr 25, 2019
31cf147
completed port of Properly_quarantine_stashed_inbound_connections
Aaronontheweb Apr 25, 2019
eb05ef3
fixed endpoint manager issue
Aaronontheweb Apr 25, 2019
43d15aa
fixed issues with ActionHandleEventListener
Aaronontheweb Apr 25, 2019
b3a36e1
fixed NRE
Aaronontheweb Apr 25, 2019
b9ef45c
porting https://github.com/akka/akka/pull/23617
Aaronontheweb Apr 25, 2019
f398de5
still working on updating EndpointRegistry
Aaronontheweb Apr 25, 2019
8870fa8
endpoint manager clean-up
Aaronontheweb Apr 25, 2019
dcfe3b2
finished cleaning up EndpointRegistry
Aaronontheweb Apr 25, 2019
357feb8
more EndpointManager cleanup
Aaronontheweb Apr 25, 2019
4cf89d3
modified AssociationHandle to allow explicit DEBUG logging of disasso…
Aaronontheweb Apr 25, 2019
6febaff
final pass of EndpointManager updates from https://github.com/akka/ak…
Aaronontheweb Apr 25, 2019
2155edc
fixed EndpointRegistrySpec
Aaronontheweb Apr 25, 2019
a625bf0
fixed issue with EndpointRegistry quarantine management
Aaronontheweb Apr 25, 2019
028aec2
updated the reliable delivery supervisor
Aaronontheweb Apr 25, 2019
0529add
cleaned up some remote metrics code
Aaronontheweb Apr 25, 2019
b5eb888
tidying up
Aaronontheweb Apr 25, 2019
82a7076
added additional spec
Aaronontheweb Apr 25, 2019
fba4e11
stashing work
Aaronontheweb Apr 25, 2019
d094fcf
adding connect handshake timeout
Aaronontheweb Apr 25, 2019
ca60831
working on updating ProtocolStateActors
Aaronontheweb Apr 26, 2019
81e960d
WIP
Aaronontheweb Apr 26, 2019
f52847c
finished most AkkaProtocolStateActor ports
Aaronontheweb Apr 26, 2019
8835426
fixed bug when handling inbound associations
Aaronontheweb Apr 26, 2019
aadec2a
cleaning up AkkaProtocolState actors
Aaronontheweb Apr 26, 2019
1731bb5
may have found the source of the leak
Aaronontheweb Apr 26, 2019
74351cd
fixed minor spelling error
Aaronontheweb Apr 26, 2019
a322904
removed debug logging
Aaronontheweb Apr 26, 2019
3a81b86
Merge branch 'dev' into fix-socket-leak
Aaronontheweb Apr 26, 2019
7a8643b
added hack to work around association issues
Aaronontheweb Apr 26, 2019
d3d1576
porting over OutboundConnection timeout spec
Aaronontheweb Apr 29, 2019
26d6683
ported InboundTimeout spec
Aaronontheweb Apr 29, 2019
32de3cd
approved new Akka.Remote API additions
Aaronontheweb Apr 29, 2019
60e84b5
removed TransportAdapter hack and cleaned up RemotingSpecs
Aaronontheweb Apr 29, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Akka.sln.DotSettings
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpKeepExistingMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpPlaceEmbeddedOnSameLineMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpRenamePlacementToArrangementMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpUseContinuousIndentInsideBracesMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EAddAccessorOwnerDeclarationBracesMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002ECSharpPlaceAttributeOnSameLineMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EMigrateBlankLinesAroundFieldToBlankLinesAroundProperty/@EntryIndexedValue">True</s:Boolean>
Expand Down
65 changes: 47 additions & 18 deletions src/core/Akka.Remote.Tests/EndpointRegistrySpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using Akka.Actor;
using Akka.TestKit;
using Akka.Util.Internal;
using FluentAssertions;
using Xunit;

namespace Akka.Remote.Tests
Expand All @@ -29,12 +30,12 @@ public EndpointRegistrySpec()
}

[Fact]
public void EndpointRegistry_must_be_able_to_register_a_writeable_endpoint_and_policy()
public void EndpointRegistry_must_be_able_to_register_a_writable_endpoint_and_policy()
{
var reg = new EndpointRegistry();
Assert.Null(reg.WritableEndpointWithPolicyFor(address1));

Assert.Equal(actorA, reg.RegisterWritableEndpoint(address1, actorA,null,null));
Assert.Equal(actorA, reg.RegisterWritableEndpoint(address1, actorA,null));

Assert.IsType<EndpointManager.Pass>(reg.WritableEndpointWithPolicyFor(address1));
Assert.Equal(actorA, reg.WritableEndpointWithPolicyFor(address1).AsInstanceOf<EndpointManager.Pass>().Endpoint);
Expand Down Expand Up @@ -68,7 +69,7 @@ public void EndpointRegistry_must_be_able_to_register_writable_and_readonly_endp
Assert.Null(reg.WritableEndpointWithPolicyFor(address1));

Assert.Equal(actorA, reg.RegisterReadOnlyEndpoint(address1, actorA, 1));
Assert.Equal(actorB, reg.RegisterWritableEndpoint(address1, actorB, null,null));
Assert.Equal(actorB, reg.RegisterWritableEndpoint(address1, actorB, null));

Assert.Equal(Tuple.Create(actorA,1), reg.ReadOnlyEndpointFor(address1));
Assert.Equal(actorB, reg.WritableEndpointWithPolicyFor(address1).AsInstanceOf<EndpointManager.Pass>().Endpoint);
Expand All @@ -85,11 +86,10 @@ public void EndpointRegistry_must_be_able_to_register_Gated_policy_for_an_addres
{
var reg = new EndpointRegistry();
Assert.Null(reg.WritableEndpointWithPolicyFor(address1));
reg.RegisterWritableEndpoint(address1, actorA, null, null);
reg.RegisterWritableEndpoint(address1, actorA, null);
var deadline = Deadline.Now;
reg.MarkAsFailed(actorA, deadline);
Assert.Equal(deadline, reg.WritableEndpointWithPolicyFor(address1).AsInstanceOf<EndpointManager.Gated>().TimeOfRelease);
Assert.Null(reg.WritableEndpointWithPolicyFor(address1).AsInstanceOf<EndpointManager.Gated>().RefuseUid);
Assert.False(reg.IsReadOnly(actorA));
Assert.False(reg.IsWritable(actorA));
}
Expand All @@ -107,8 +107,8 @@ public void EndpointRegistry_must_remove_readonly_endpoints_if_marked_as_failed(
public void EndpointRegistry_must_keep_tombstones_when_removing_an_endpoint()
{
var reg = new EndpointRegistry();
reg.RegisterWritableEndpoint(address1, actorA, null, null);
reg.RegisterWritableEndpoint(address2, actorB, null, null);
reg.RegisterWritableEndpoint(address1, actorA, null);
reg.RegisterWritableEndpoint(address2, actorB, null);
var deadline = Deadline.Now;
reg.MarkAsFailed(actorA, deadline);
reg.MarkAsQuarantined(address2, 42, deadline);
Expand All @@ -117,7 +117,6 @@ public void EndpointRegistry_must_keep_tombstones_when_removing_an_endpoint()
reg.UnregisterEndpoint(actorB);

Assert.Equal(deadline, reg.WritableEndpointWithPolicyFor(address1).AsInstanceOf<EndpointManager.Gated>().TimeOfRelease);
Assert.Null(reg.WritableEndpointWithPolicyFor(address1).AsInstanceOf<EndpointManager.Gated>().RefuseUid);
Assert.Equal(deadline, reg.WritableEndpointWithPolicyFor(address2).AsInstanceOf<EndpointManager.Quarantined>().Deadline);
Assert.Equal(42, reg.WritableEndpointWithPolicyFor(address2).AsInstanceOf<EndpointManager.Quarantined>().Uid);
}
Expand All @@ -126,14 +125,13 @@ public void EndpointRegistry_must_keep_tombstones_when_removing_an_endpoint()
public void EndpointRegistry_should_prune_outdated_Gated_directives_properly()
{
var reg = new EndpointRegistry();
reg.RegisterWritableEndpoint(address1, actorA, null, null);
reg.RegisterWritableEndpoint(address2, actorB, null, null);
reg.RegisterWritableEndpoint(address1, actorA, null);
reg.RegisterWritableEndpoint(address2, actorB, null);
reg.MarkAsFailed(actorA, Deadline.Now);
var farIntheFuture = Deadline.Now + TimeSpan.FromSeconds(60);
reg.MarkAsFailed(actorB, farIntheFuture);
reg.Prune();

Assert.Null(reg.WritableEndpointWithPolicyFor(address1).AsInstanceOf<EndpointManager.WasGated>().RefuseUid);
Assert.Equal(farIntheFuture, reg.WritableEndpointWithPolicyFor(address2).AsInstanceOf<EndpointManager.Gated>().TimeOfRelease);
}

Expand Down Expand Up @@ -176,7 +174,7 @@ public void EndpointRegistry_should_overwrite_Quarantine_policy_with_Pass_on_Reg
Assert.True(reg.IsQuarantined(address1, quarantinedUid));

var writableUid = 43;
reg.RegisterWritableEndpoint(address1, TestActor, writableUid, quarantinedUid);
reg.RegisterWritableEndpoint(address1, TestActor, writableUid);
Assert.True(reg.IsWritable(TestActor));
}

Expand All @@ -187,33 +185,64 @@ public void EndpointRegistry_should_overwrite_Gated_policy_with_Pass_on_Register
var deadline = Deadline.Now + TimeSpan.FromMinutes(30);
var willBeGated = 42;

reg.RegisterWritableEndpoint(address1, TestActor, willBeGated, null);
reg.RegisterWritableEndpoint(address1, TestActor, willBeGated);
Assert.NotNull(reg.WritableEndpointWithPolicyFor(address1));
Assert.True(reg.IsWritable(TestActor));
reg.MarkAsFailed(TestActor, deadline);
Assert.False(reg.IsWritable(TestActor));

var writableUid = 43;
reg.RegisterWritableEndpoint(address1, TestActor, writableUid, willBeGated);
reg.RegisterWritableEndpoint(address1, TestActor, writableUid);
Assert.True(reg.IsWritable(TestActor));
}

[Fact]
public void EndpointRegister_should_not_report_endpoint_as_writeable_if_no_Pass_policy()
public void EndpointRegistry_should_not_report_endpoint_as_writable_if_no_Pass_policy()
{
var reg = new EndpointRegistry();
var deadline = Deadline.Now + TimeSpan.FromMinutes(30);

Assert.False(reg.IsWritable(TestActor)); // no policy

reg.RegisterWritableEndpoint(address1, TestActor, 42, null);
reg.RegisterWritableEndpoint(address1, TestActor, 42);
Assert.True(reg.IsWritable(TestActor)); // pass
reg.MarkAsFailed(TestActor, deadline);
Assert.False(reg.IsWritable(TestActor)); // Gated
reg.RegisterWritableEndpoint(address1, TestActor, 43, 42); // restarted
reg.RegisterWritableEndpoint(address1, TestActor, 43); // restarted
Assert.True(reg.IsWritable(TestActor)); // pass
reg.MarkAsQuarantined(address1, 43, deadline);
Assert.False(reg.HasWriteableEndpointFor(address1)); // Quarantined
Assert.False(reg.HasWritableEndpointFor(address1)); // Quarantined
}

[Fact]
public void EndpointRegistry_should_keep_refuseUid_after_register_new_Endpoint()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Validating improved refuseUid handling for keeping track of quarantines inside the EndpointRegistry - per akka/akka#23734

{
var reg = new EndpointRegistry();
var deadline = Deadline.Now + TimeSpan.FromMinutes(30);

reg.RegisterWritableEndpoint(address1, actorA, null);
reg.MarkAsQuarantined(address1, 42, deadline);
reg.RefuseUid(address1).Should().Be(42);
reg.IsQuarantined(address1, 42).Should().BeTrue();

reg.UnregisterEndpoint(actorA);
// Quarantined marker is kept so far
var policy = reg.WritableEndpointWithPolicyFor(address1);
policy.Should().BeOfType<EndpointManager.Quarantined>();
policy.AsInstanceOf<EndpointManager.Quarantined>().Uid.Should().Be(42);
policy.AsInstanceOf<EndpointManager.Quarantined>().Deadline.Should().Be(deadline);

reg.RefuseUid(address1).Should().Be(42);
reg.IsQuarantined(address1, 42).Should().BeTrue();

reg.RegisterWritableEndpoint(address1, actorB, null);
// Quarantined marker is gone
var policy2 = reg.WritableEndpointWithPolicyFor(address1);
policy2.Should().BeOfType<EndpointManager.Pass>();
policy2.AsInstanceOf<EndpointManager.Pass>().Endpoint.Should().Be(actorB);
// but we still have the refuseUid
reg.RefuseUid(address1).Should().Be(42);
reg.IsQuarantined(address1, 42).Should().BeTrue();
}
}
}
Expand Down
18 changes: 9 additions & 9 deletions src/core/Akka.Remote.Tests/RemoteConfigSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,14 @@
using System;
using System.Collections.Generic;
using System.Linq;
using Akka.Actor;
using Akka.Actor.Internal;
using Akka.Remote.Transport.DotNetty;
using Akka.TestKit;
using Akka.Util.Internal;
using Xunit;
using System.Net;
using Akka.Remote.Transport;
using static Akka.Util.RuntimeDetector;
using FluentAssertions;

namespace Akka.Remote.Tests
{
Expand All @@ -32,7 +31,7 @@ public RemoteConfigSpec():base(@"
[Fact]
public void Remoting_should_contain_correct_configuration_values_in_ReferenceConf()
{
var remoteSettings = ((RemoteActorRefProvider)((ExtendedActorSystem) Sys).Provider).RemoteSettings;
var remoteSettings = RARP.For(Sys).Provider.RemoteSettings;

Assert.False(remoteSettings.LogReceive);
Assert.False(remoteSettings.LogSend);
Expand Down Expand Up @@ -80,7 +79,7 @@ public void Remoting_should_contain_correct_configuration_values_in_ReferenceCon
[Fact]
public void Remoting_should_be_able_to_parse_AkkaProtocol_related_config_elements()
{
var settings = new AkkaProtocolSettings(((RemoteActorRefProvider)((ExtendedActorSystem)Sys).Provider).RemoteSettings.Config);
var settings = new AkkaProtocolSettings(RARP.For(Sys).Provider.RemoteSettings.Config);

Assert.Equal(typeof(DeadlineFailureDetector), Type.GetType(settings.TransportFailureDetectorImplementationClass));
Assert.Equal(TimeSpan.FromSeconds(4), settings.TransportHeartBeatInterval);
Expand All @@ -90,10 +89,11 @@ public void Remoting_should_be_able_to_parse_AkkaProtocol_related_config_element
[Fact]
public void Remoting_should_contain_correct_heliosTCP_values_in_ReferenceConf()
{
var c = ((RemoteActorRefProvider)((ActorSystemImpl)Sys).Provider).RemoteSettings.Config.GetConfig("akka.remote.dot-netty.tcp");
var c = RARP.For(Sys).Provider.RemoteSettings.Config.GetConfig("akka.remote.dot-netty.tcp");
var s = DotNettyTransportSettings.Create(c);

Assert.Equal(TimeSpan.FromSeconds(15), s.ConnectTimeout);
s.ConnectTimeout.Should().Be(new AkkaProtocolSettings(RARP.For(Sys).Provider.RemoteSettings.Config).HandshakeTimeout);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Validate the new ConnectTimeout behavior.

Assert.Null(s.WriteBufferHighWaterMark);
Assert.Null(s.WriteBufferLowWaterMark);
Assert.Equal(256000, s.SendBufferSize.Value);
Expand All @@ -117,7 +117,7 @@ public void Remoting_should_contain_correct_heliosTCP_values_in_ReferenceConf()
public void When_remoting_works_in_Mono_ip_enforcement_should_be_defaulted_to_true()
{
if (!IsMono) return; // skip IF NOT using Mono
var c = ((RemoteActorRefProvider)((ActorSystemImpl)Sys).Provider).RemoteSettings.Config.GetConfig("akka.remote.dot-netty.tcp");
var c = RARP.For(Sys).Provider.RemoteSettings.Config.GetConfig("akka.remote.dot-netty.tcp");
var s = DotNettyTransportSettings.Create(c);

Assert.True(s.EnforceIpFamily);
Expand All @@ -127,7 +127,7 @@ public void When_remoting_works_in_Mono_ip_enforcement_should_be_defaulted_to_tr
public void When_remoting_works_not_in_Mono_ip_enforcement_should_be_defaulted_to_false()
{
if (IsMono) return; // skip IF using Mono
var c = ((RemoteActorRefProvider)((ActorSystemImpl)Sys).Provider).RemoteSettings.Config.GetConfig("akka.remote.dot-netty.tcp");
var c = RARP.For(Sys).Provider.RemoteSettings.Config.GetConfig("akka.remote.dot-netty.tcp");
var s = DotNettyTransportSettings.Create(c);

Assert.False(s.EnforceIpFamily);
Expand All @@ -137,7 +137,7 @@ public void When_remoting_works_not_in_Mono_ip_enforcement_should_be_defaulted_t
[Fact]
public void Remoting_should_contain_correct_socket_worker_pool_configuration_values_in_ReferenceConf()
{
var c = ((RemoteActorRefProvider)((ActorSystemImpl)Sys).Provider).RemoteSettings.Config.GetConfig("akka.remote.dot-netty.tcp");
var c = RARP.For(Sys).Provider.RemoteSettings.Config.GetConfig("akka.remote.dot-netty.tcp");

// server-socket-worker-pool
{
Expand All @@ -159,7 +159,7 @@ public void Remoting_should_contain_correct_socket_worker_pool_configuration_val
[Fact]
public void Remoting_should_contain_correct_hostname_values_in_ReferenceConf()
{
var c = ((RemoteActorRefProvider)((ActorSystemImpl)Sys).Provider).RemoteSettings.Config.GetConfig("akka.remote.dot-netty.tcp");
var c = RARP.For(Sys).Provider.RemoteSettings.Config.GetConfig("akka.remote.dot-netty.tcp");
var s = DotNettyTransportSettings.Create(c);

//Non-specified hostnames should default to IPAddress.Any
Expand Down
Loading