diff --git a/src/core/Akka/IO/SocketEventArgsPool.cs b/src/core/Akka/IO/SocketEventArgsPool.cs index 1d2cdc17743..b7f460b1d2f 100644 --- a/src/core/Akka/IO/SocketEventArgsPool.cs +++ b/src/core/Akka/IO/SocketEventArgsPool.cs @@ -70,7 +70,9 @@ public void Release(SocketAsyncEventArgs e) } catch (InvalidOperationException) { - // it can be that for some reason socket is in use and haven't closed yet + // it can be that for some reason socket is in use and haven't closed yet. Dispose anyway to avoid leaks. + e.Dispose(); + active--; } } diff --git a/src/core/Akka/IO/TcpConnection.cs b/src/core/Akka/IO/TcpConnection.cs index 5b0a5f05b6b..8ff2ecdb35e 100644 --- a/src/core/Akka/IO/TcpConnection.cs +++ b/src/core/Akka/IO/TcpConnection.cs @@ -622,9 +622,11 @@ protected override void PostStop() if (IsWritePending) { pendingWrite.Release(); // we should release ConnectionInfo event args (if they're not released already) - ReleaseSocketAsyncEventArgs(); } + // always try to release SocketAsyncEventArgs to avoid memory leaks + ReleaseSocketAsyncEventArgs(); + if (closedMessage != null) { var interestedInClose = IsWritePending diff --git a/src/core/Akka/IO/TcpOutgoingConnection.cs b/src/core/Akka/IO/TcpOutgoingConnection.cs index 40e85f0803c..08310cd1e16 100644 --- a/src/core/Akka/IO/TcpOutgoingConnection.cs +++ b/src/core/Akka/IO/TcpOutgoingConnection.cs @@ -23,7 +23,9 @@ internal sealed class TcpOutgoingConnection : TcpConnection { private readonly IActorRef _commander; private readonly Tcp.Connect _connect; - + + private SocketAsyncEventArgs _connectArgs; + public TcpOutgoingConnection(TcpExt tcp, IActorRef commander, Tcp.Connect connect) : base(tcp, new Socket(SocketType.Stream, ProtocolType.Tcp) { Blocking = false }, connect.PullMode) { @@ -44,8 +46,19 @@ public TcpOutgoingConnection(TcpExt tcp, IActorRef commander, Tcp.Connect connec Context.SetReceiveTimeout(connect.Timeout.Value); //Initiate connection timeout if supplied } + private void ReleaseConnectionSocketArgs() + { + if (_connectArgs != null) + { + Tcp.SocketEventArgsPool.Release(_connectArgs); + _connectArgs = null; + } + } + private void Stop() { + ReleaseConnectionSocketArgs(); + StopWith(new CloseInformation(new HashSet(new[] {_commander}), _connect.FailureMessage)); } @@ -87,6 +100,14 @@ protected override void PreStart() }); } + protected override void PostStop() + { + // always try to release SocketAsyncEventArgs to avoid memory leaks + ReleaseConnectionSocketArgs(); + + base.PostStop(); + } + protected override bool Receive(object message) { throw new NotSupportedException(); @@ -124,13 +145,13 @@ private void Register(IPEndPoint address, IPEndPoint fallbackAddress) { Log.Debug("Attempting connection to [{0}]", address); - var connectArgs = Tcp.SocketEventArgsPool.Acquire(Self); - connectArgs.RemoteEndPoint = address; + _connectArgs = Tcp.SocketEventArgsPool.Acquire(Self); + _connectArgs.RemoteEndPoint = address; // we don't setup buffer here, it shouldn't be necessary just for connection - if (!Socket.ConnectAsync(connectArgs)) + if (!Socket.ConnectAsync(_connectArgs)) Self.Tell(IO.Tcp.SocketConnected.Instance); - Become(Connecting(Tcp.Settings.FinishConnectRetries, connectArgs, fallbackAddress)); + Become(Connecting(Tcp.Settings.FinishConnectRetries, _connectArgs, fallbackAddress)); }); } @@ -145,6 +166,7 @@ private Receive Connecting(int remainingFinishConnectRetries, SocketAsyncEventAr if (_connect.Timeout.HasValue) Context.SetReceiveTimeout(null); Log.Debug("Connection established to [{0}]", _connect.RemoteAddress); + ReleaseConnectionSocketArgs(); AcquireSocketAsyncEventArgs(); CompleteConnect(_commander, _connect.Options);