diff --git a/src/core/Akka/IO/TcpConnection.cs b/src/core/Akka/IO/TcpConnection.cs index 5d87c6f6114..a25a5ce799c 100644 --- a/src/core/Akka/IO/TcpConnection.cs +++ b/src/core/Akka/IO/TcpConnection.cs @@ -13,7 +13,6 @@ using System.Linq; using System.Net.Sockets; using System.Runtime.CompilerServices; -using System.Threading; using Akka.Actor; using Akka.Dispatch; using Akka.Event; @@ -108,14 +107,14 @@ enum ConnectionStatus protected TcpConnection(TcpExt tcp, Socket socket, bool pullMode, Option writeCommandsBufferMaxSize) { if (socket == null) throw new ArgumentNullException(nameof(socket)); - + _pullMode = pullMode; _writeCommandsQueue = new PendingSimpleWritesQueue(Log, writeCommandsBufferMaxSize); _traceLogging = tcp.Settings.TraceLogging; - + Tcp = tcp; Socket = socket; - + if (pullMode) SetStatus(ConnectionStatus.ReadingSuspended); } @@ -155,30 +154,26 @@ private Receive WaitingForRegistration(IActorRef commander) // up to this point we've been watching the commander, // but since registration is now complete we only need to watch the handler from here on if (!Equals(register.Handler, commander)) - { - Context.Unwatch(commander); - Context.Watch(register.Handler); - } + SignDeathPact(register.Handler); // will unsign death pact with commander automatically if (_traceLogging) Log.Debug("[{0}] registered as connection handler", register.Handler); var registerInfo = new ConnectionInfo(register.Handler, register.KeepOpenOnPeerClosed, register.UseResumeWriting); - // if we have resumed reading from pullMode while waiting for Register then read - if (_pullMode && !HasStatus(ConnectionStatus.ReadingSuspended)) ResumeReading(); - else if (!_pullMode) ReceiveAsync(); - Context.SetReceiveTimeout(null); Context.Become(Connected(registerInfo)); + // if we are in push mode or already have resumed reading in pullMode while waiting for Register then read + if (!_pullMode || !HasStatus(ConnectionStatus.ReadingSuspended)) ResumeReading(); + // If there is something buffered before we got Register message - put it all to the socket var bufferedWrite = GetNextWrite(); if (bufferedWrite.HasValue) { SetStatus(ConnectionStatus.Sending); DoWrite(registerInfo, bufferedWrite.Value); - } - + } + return true; case ResumeReading _: ClearStatus(ConnectionStatus.ReadingSuspended); return true; case SuspendReading _: SetStatus(ConnectionStatus.ReadingSuspended); return true; @@ -205,7 +200,7 @@ private Receive WaitingForRegistration(IActorRef commander) Log.Warning("Received Write command before Register command. " + "It will be buffered until Register will be received (buffered write size is {0} bytes)", commandSize); } - + return true; default: return false; } @@ -268,7 +263,7 @@ private Receive ClosingWithPendingWrite(ConnectionInfo info, IActorRef closeComm AcknowledgeSent(); if (IsWritePending) DoWrite(info, GetAllowedPendingWrite()); - else + else HandleClose(info, closeCommander, closedEvent); return true; case UpdatePendingWriteAndThen updatePendingWrite: @@ -277,7 +272,7 @@ private Receive ClosingWithPendingWrite(ConnectionInfo info, IActorRef closeComm if (nextWrite.HasValue) DoWrite(info, nextWrite); - else + else HandleClose(info, closeCommander, closedEvent); return true; case WriteFileFailed fail: HandleError(info.Handler, fail.Cause); return true; @@ -312,7 +307,7 @@ private Receive HandleWriteMessages(ConnectionInfo info) case SocketSent _: // Send ack to sender AcknowledgeSent(); - + // If there is something to send - send it var pendingWrite = GetAllowedPendingWrite(); if (pendingWrite.HasValue) @@ -320,14 +315,14 @@ private Receive HandleWriteMessages(ConnectionInfo info) SetStatus(ConnectionStatus.Sending); DoWrite(info, pendingWrite); } - + // If message is fully sent, notify sender who sent ResumeWriting command if (!IsWritePending && _interestedInResume != null) { _interestedInResume.Tell(WritingResumed.Instance); _interestedInResume = null; } - + return true; case WriteCommand write: if (HasStatus(ConnectionStatus.WritingSuspended)) @@ -356,7 +351,7 @@ private Receive HandleWriteMessages(ConnectionInfo info) DropWrite(info, write); return true; } - + nextWrite = GetNextWrite(headCommands: new []{ (simpleWriteCommand, Sender) }); } else @@ -364,15 +359,15 @@ private Receive HandleWriteMessages(ConnectionInfo info) _writeCommandsQueue.EnqueueSimpleWrites(write, Sender); nextWrite = GetNextWrite(); } - + // If there is something to send and we are allowed to, lets put the next command on the wire if (nextWrite.HasValue) { SetStatus(ConnectionStatus.Sending); DoWrite(info, nextWrite.Value); - } + } } - + return true; case ResumeWriting _: /* @@ -396,7 +391,7 @@ private Receive HandleWriteMessages(ConnectionInfo info) case UpdatePendingWriteAndThen updatePendingWrite: var updatedWrite = updatePendingWrite.RemainingWrite; updatePendingWrite.Work(); - if (updatedWrite.HasValue) + if (updatedWrite.HasValue) DoWrite(info, updatedWrite.Value); return true; case WriteFileFailed fail: @@ -461,7 +456,7 @@ private void AcknowledgeSent() { ackInfo.Commander.Tell(ackInfo.Ack); } - + ClearStatus(ConnectionStatus.Sending); } @@ -538,7 +533,7 @@ private void DoWrite(ConnectionInfo info, Option write) { _pendingAcks.Enqueue(pendingAck); } - + write.Value.DoWrite(info); } @@ -672,7 +667,7 @@ SocketCompleted ResolveMessage(SocketAsyncEventArgs e) throw new NotSupportedException($"Socket operation {e.LastOperation} is not supported"); } } - + var args = new SocketAsyncEventArgs(); args.UserToken = onCompleteNotificationsReceiver; args.Completed += (sender, e) => @@ -684,7 +679,7 @@ SocketCompleted ResolveMessage(SocketAsyncEventArgs e) return args; } - + protected void ReleaseSocketEventArgs(SocketAsyncEventArgs e) { e.UserToken = null; @@ -700,7 +695,7 @@ protected void ReleaseSocketEventArgs(SocketAsyncEventArgs e) catch (InvalidOperationException) { } e.Dispose(); - + } [MethodImpl(MethodImplOptions.AggressiveInlining)] @@ -728,6 +723,7 @@ private void Abort() protected void StopWith(CloseInformation closeInfo) { _closedMessage = closeInfo; + UnsignDeathPact(); Context.Stop(Self); } @@ -810,7 +806,7 @@ private Option GetNextWrite(IEnumerable<(SimpleWriteCommand Comman { return CreatePendingBufferWrite(writeCommands); } - + // No more writes out there return Option.None; } @@ -986,7 +982,7 @@ public bool EnqueueSimpleWrites(WriteCommand command, IActorRef sender) { return EnqueueSimpleWrites(command, sender, out _); } - + /// /// Adds all subcommands stored in provided command. /// Performs buffer size checks @@ -997,7 +993,7 @@ public bool EnqueueSimpleWrites(WriteCommand command, IActorRef sender) public bool EnqueueSimpleWrites(WriteCommand command, IActorRef sender, out int bufferedSize) { bufferedSize = 0; - + foreach (var writeInfo in ExtractFromCommand(command)) { var sizeAfterAppending = _totalSizeInBytes + writeInfo.DataSize; @@ -1013,10 +1009,10 @@ public bool EnqueueSimpleWrites(WriteCommand command, IActorRef sender, out int _queue.Enqueue((writeInfo.Command, sender, writeInfo.DataSize)); bufferedSize += writeInfo.DataSize; } - + return true; } - + /// /// Adds all subcommands stored in provided command. /// Performs buffer size checks for all, except first one, that is not buffered @@ -1037,7 +1033,7 @@ public bool EnqueueSimpleWritesExceptFirst(WriteCommand command, IActorRef sende first = writeInfo.Command; continue; } - + var sizeAfterAppending = _totalSizeInBytes + writeInfo.DataSize; if (_maxQueueSizeInBytes.HasValue && _maxQueueSizeInBytes.Value < sizeAfterAppending) { @@ -1061,7 +1057,7 @@ public bool EnqueueSimpleWritesExceptFirst(WriteCommand command, IActorRef sende { if (_queue.Count == 0) throw new InvalidOperationException("Write commands queue is empty"); - + var (command, sender, size) = _queue.Dequeue(); _totalSizeInBytes -= size; return (command, sender); @@ -1076,7 +1072,7 @@ public bool EnqueueSimpleWritesExceptFirst(WriteCommand command, IActorRef sende while (TryGetNext(out var command)) yield return command; } - + /// /// Gets next command from the queue, if any ///