Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Couple of fixes for the TcpConnection #5817

Merged
merged 2 commits into from
Apr 4, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 34 additions & 38 deletions src/core/Akka/IO/TcpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
using System.Linq;
using System.Net.Sockets;
using System.Runtime.CompilerServices;
using System.Threading;
using Akka.Actor;
using Akka.Dispatch;
using Akka.Event;
Expand Down Expand Up @@ -108,14 +107,14 @@ enum ConnectionStatus
protected TcpConnection(TcpExt tcp, Socket socket, bool pullMode, Option<int> writeCommandsBufferMaxSize)
{
if (socket == null) throw new ArgumentNullException(nameof(socket));

_pullMode = pullMode;
_writeCommandsQueue = new PendingSimpleWritesQueue(Log, writeCommandsBufferMaxSize);
_traceLogging = tcp.Settings.TraceLogging;

Tcp = tcp;
Socket = socket;

if (pullMode) SetStatus(ConnectionStatus.ReadingSuspended);
}

Expand Down Expand Up @@ -155,30 +154,26 @@ private Receive WaitingForRegistration(IActorRef commander)
// up to this point we've been watching the commander,
// but since registration is now complete we only need to watch the handler from here on
if (!Equals(register.Handler, commander))
{
Context.Unwatch(commander);
Context.Watch(register.Handler);
}
SignDeathPact(register.Handler); // will unsign death pact with commander automatically
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM


if (_traceLogging) Log.Debug("[{0}] registered as connection handler", register.Handler);

var registerInfo = new ConnectionInfo(register.Handler, register.KeepOpenOnPeerClosed, register.UseResumeWriting);

// if we have resumed reading from pullMode while waiting for Register then read
if (_pullMode && !HasStatus(ConnectionStatus.ReadingSuspended)) ResumeReading();
else if (!_pullMode) ReceiveAsync();

Context.SetReceiveTimeout(null);
Context.Become(Connected(registerInfo));

// if we are in push mode or already have resumed reading in pullMode while waiting for Register then read
if (!_pullMode || !HasStatus(ConnectionStatus.ReadingSuspended)) ResumeReading();

// If there is something buffered before we got Register message - put it all to the socket
var bufferedWrite = GetNextWrite();
if (bufferedWrite.HasValue)
{
SetStatus(ConnectionStatus.Sending);
DoWrite(registerInfo, bufferedWrite.Value);
}
}

return true;
case ResumeReading _: ClearStatus(ConnectionStatus.ReadingSuspended); return true;
case SuspendReading _: SetStatus(ConnectionStatus.ReadingSuspended); return true;
Expand All @@ -205,7 +200,7 @@ private Receive WaitingForRegistration(IActorRef commander)
Log.Warning("Received Write command before Register command. " +
"It will be buffered until Register will be received (buffered write size is {0} bytes)", commandSize);
}

return true;
default: return false;
}
Expand Down Expand Up @@ -268,7 +263,7 @@ private Receive ClosingWithPendingWrite(ConnectionInfo info, IActorRef closeComm
AcknowledgeSent();
if (IsWritePending)
DoWrite(info, GetAllowedPendingWrite());
else
else
HandleClose(info, closeCommander, closedEvent);
return true;
case UpdatePendingWriteAndThen updatePendingWrite:
Expand All @@ -277,7 +272,7 @@ private Receive ClosingWithPendingWrite(ConnectionInfo info, IActorRef closeComm

if (nextWrite.HasValue)
DoWrite(info, nextWrite);
else
else
HandleClose(info, closeCommander, closedEvent);
return true;
case WriteFileFailed fail: HandleError(info.Handler, fail.Cause); return true;
Expand Down Expand Up @@ -312,22 +307,22 @@ private Receive HandleWriteMessages(ConnectionInfo info)
case SocketSent _:
// Send ack to sender
AcknowledgeSent();

// If there is something to send - send it
var pendingWrite = GetAllowedPendingWrite();
if (pendingWrite.HasValue)
{
SetStatus(ConnectionStatus.Sending);
DoWrite(info, pendingWrite);
}

// If message is fully sent, notify sender who sent ResumeWriting command
if (!IsWritePending && _interestedInResume != null)
{
_interestedInResume.Tell(WritingResumed.Instance);
_interestedInResume = null;
}

return true;
case WriteCommand write:
if (HasStatus(ConnectionStatus.WritingSuspended))
Expand Down Expand Up @@ -356,23 +351,23 @@ private Receive HandleWriteMessages(ConnectionInfo info)
DropWrite(info, write);
return true;
}

nextWrite = GetNextWrite(headCommands: new []{ (simpleWriteCommand, Sender) });
}
else
{
_writeCommandsQueue.EnqueueSimpleWrites(write, Sender);
nextWrite = GetNextWrite();
}

// If there is something to send and we are allowed to, lets put the next command on the wire
if (nextWrite.HasValue)
{
SetStatus(ConnectionStatus.Sending);
DoWrite(info, nextWrite.Value);
}
}
}

return true;
case ResumeWriting _:
/*
Expand All @@ -396,7 +391,7 @@ private Receive HandleWriteMessages(ConnectionInfo info)
case UpdatePendingWriteAndThen updatePendingWrite:
var updatedWrite = updatePendingWrite.RemainingWrite;
updatePendingWrite.Work();
if (updatedWrite.HasValue)
if (updatedWrite.HasValue)
DoWrite(info, updatedWrite.Value);
return true;
case WriteFileFailed fail:
Expand Down Expand Up @@ -461,7 +456,7 @@ private void AcknowledgeSent()
{
ackInfo.Commander.Tell(ackInfo.Ack);
}

ClearStatus(ConnectionStatus.Sending);
}

Expand Down Expand Up @@ -538,7 +533,7 @@ private void DoWrite(ConnectionInfo info, Option<PendingWrite> write)
{
_pendingAcks.Enqueue(pendingAck);
}

write.Value.DoWrite(info);
}

Expand Down Expand Up @@ -672,7 +667,7 @@ SocketCompleted ResolveMessage(SocketAsyncEventArgs e)
throw new NotSupportedException($"Socket operation {e.LastOperation} is not supported");
}
}

var args = new SocketAsyncEventArgs();
args.UserToken = onCompleteNotificationsReceiver;
args.Completed += (sender, e) =>
Expand All @@ -684,7 +679,7 @@ SocketCompleted ResolveMessage(SocketAsyncEventArgs e)

return args;
}

protected void ReleaseSocketEventArgs(SocketAsyncEventArgs e)
{
e.UserToken = null;
Expand All @@ -700,7 +695,7 @@ protected void ReleaseSocketEventArgs(SocketAsyncEventArgs e)
catch (InvalidOperationException) { }

e.Dispose();

}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand Down Expand Up @@ -728,6 +723,7 @@ private void Abort()
protected void StopWith(CloseInformation closeInfo)
{
_closedMessage = closeInfo;
UnsignDeathPact();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Context.Stop(Self);
}

Expand Down Expand Up @@ -810,7 +806,7 @@ private Option<PendingWrite> GetNextWrite(IEnumerable<(SimpleWriteCommand Comman
{
return CreatePendingBufferWrite(writeCommands);
}

// No more writes out there
return Option<PendingWrite>.None;
}
Expand Down Expand Up @@ -986,7 +982,7 @@ public bool EnqueueSimpleWrites(WriteCommand command, IActorRef sender)
{
return EnqueueSimpleWrites(command, sender, out _);
}

/// <summary>
/// Adds all <see cref="SimpleWriteCommand"/> subcommands stored in provided command.
/// Performs buffer size checks
Expand All @@ -997,7 +993,7 @@ public bool EnqueueSimpleWrites(WriteCommand command, IActorRef sender)
public bool EnqueueSimpleWrites(WriteCommand command, IActorRef sender, out int bufferedSize)
{
bufferedSize = 0;

foreach (var writeInfo in ExtractFromCommand(command))
{
var sizeAfterAppending = _totalSizeInBytes + writeInfo.DataSize;
Expand All @@ -1013,10 +1009,10 @@ public bool EnqueueSimpleWrites(WriteCommand command, IActorRef sender, out int
_queue.Enqueue((writeInfo.Command, sender, writeInfo.DataSize));
bufferedSize += writeInfo.DataSize;
}

return true;
}

/// <summary>
/// Adds all <see cref="SimpleWriteCommand"/> subcommands stored in provided command.
/// Performs buffer size checks for all, except first one, that is not buffered
Expand All @@ -1037,7 +1033,7 @@ public bool EnqueueSimpleWritesExceptFirst(WriteCommand command, IActorRef sende
first = writeInfo.Command;
continue;
}

var sizeAfterAppending = _totalSizeInBytes + writeInfo.DataSize;
if (_maxQueueSizeInBytes.HasValue && _maxQueueSizeInBytes.Value < sizeAfterAppending)
{
Expand All @@ -1061,7 +1057,7 @@ public bool EnqueueSimpleWritesExceptFirst(WriteCommand command, IActorRef sende
{
if (_queue.Count == 0)
throw new InvalidOperationException("Write commands queue is empty");

var (command, sender, size) = _queue.Dequeue();
_totalSizeInBytes -= size;
return (command, sender);
Expand All @@ -1076,7 +1072,7 @@ public bool EnqueueSimpleWritesExceptFirst(WriteCommand command, IActorRef sende
while (TryGetNext(out var command))
yield return command;
}

/// <summary>
/// Gets next command from the queue, if any
/// </summary>
Expand Down