-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Akka.Remote socket leak fixes #3764
Conversation
@@ -1099,24 +1127,25 @@ private void InitializeFSM() | |||
} | |||
awh.HandlerListener.ContinueWith(result => result.Result.Notify(disassociateNotification), | |||
TaskContinuationOptions.ExecuteSynchronously); | |||
awh.WrappedHandle.Disassociate(DisassociationReason(@event.Reason), _log); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may have been the source of the leak - failed to disassociate handles when handshake failed at the finish line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Documented major changes in PR
@@ -28,6 +28,7 @@ namespace Akka.Remote | |||
public class AkkaProtocolSettings | |||
{ | |||
public AkkaProtocolSettings(Akka.Configuration.Config config) { } | |||
public System.TimeSpan HandshakeTimeout { get; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added new handshake timeout settings to API, per akka/akka#18542
} | ||
|
||
[Fact] | ||
public void EndpointRegistry_should_keep_refuseUid_after_register_new_Endpoint() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Validating improved refuseUid
handling for keeping track of quarantines inside the EndpointRegistry
- per akka/akka#23734
var s = DotNettyTransportSettings.Create(c); | ||
|
||
Assert.Equal(TimeSpan.FromSeconds(15), s.ConnectTimeout); | ||
s.ConnectTimeout.Should().Be(new AkkaProtocolSettings(RARP.For(Sys).Provider.RemoteSettings.Config).HandshakeTimeout); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Validate the new ConnectTimeout
behavior.
Deploy(Sys, new Deploy(@"/gonk", new RemoteScope(Addr(_remoteSystem, "tcp")))); | ||
Deploy(Sys, new Deploy(@"/zagzag", new RemoteScope(Addr(_remoteSystem, "udp")))); | ||
|
||
_remote = _remoteSystem.ActorOf(Props.Create<Echo2>(), "echo"); | ||
_here = Sys.ActorSelection("akka.test://remote-sys@localhost:12346/user/echo"); | ||
|
||
AtStartup(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have to call it here, since we can't cleanly override base.AtStartup
and have it called via base class constructor - throws a NullReferenceException
since _remoteSystem
isn't initialized until concrete constructor finishes being called.
return Directive.Stop; | ||
case HopelessAssociation h: | ||
return Hopeless(h); | ||
case ActorInitializationException i when i.InnerException is HopelessAssociation h2: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New case for handling ReliableDeliverySupervisor
failing quarantine check in its constructor.
.With<Gated>(gated => | ||
{ | ||
if (gated.TimeOfRelease.IsOverdue) CreateAndRegisterWritingEndpoint(gated.RefuseUid).Tell(send); | ||
switch (_endpoints.WritableEndpointWithPolicyFor(recipientAddress)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rewrote policy handlers as switch
statements - but otherwise functionality is the same / updated to match JVM.
message.Match() | ||
.With<InboundAssociation>(ia => //need to create an Inbound ProtocolStateActor | ||
{ | ||
switch (message) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rewrote message handling to use switch
rather than PatternMatch
- went over it 2-3 times to make sure it was done correctly, guided by careful side-by-side review with JVM code and some Akka.Remote.Tests failures.
StartWith(AssociationState.WaitHandshake, d); | ||
}); | ||
InitHandshakeTimer(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Activates handshake timer for explicit connection timeouts.
@akkadotnet/core this is ready to go - please let me know if there are any comments, concerns, or fixes I should make. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM but with one question.
* completed port of Properly_quarantine_stashed_inbound_connections * porting akka/akka#23617 * modified AssociationHandle to allow explicit DEBUG logging of disassociation reasons * final pass of EndpointManager updates from akka/akka#23617 * fixed issue with EndpointRegistry quarantine management * updated the reliable delivery supervisor * fixed bug when handling inbound associations * cleaning up AkkaProtocolState actors * porting over OutboundConnection timeout spec * ported InboundTimeout spec * approved new Akka.Remote API additions
No description provided.