Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up bad outbound ACKs in Akka.Remote #4963

Merged
merged 4 commits into from
Apr 21, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 45 additions & 45 deletions src/core/Akka.Remote/Endpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1139,7 +1139,7 @@ protected override void PostStop()
}
_buffer.Clear();

if (_handle != null) _handle.Disassociate(_stopReason);
_handle?.Disassociate(_stopReason);
EventPublisher.NotifyListeners(new DisassociatedEvent(LocalAddress, RemoteAddress, Inbound));
}

Expand Down Expand Up @@ -1219,6 +1219,12 @@ private void Handoff()
BecomeWritingOrSendBufferedMessages();
});
Receive<EndpointManager.Send>(send => EnqueueInBuffer(send));

// Ignore outgoing acks during take over, since we might have
// replaced the handle with a connection to a new, restarted, system
// and the ack might be targeted to the old incarnation.
// Relates to https://github.com/akka/akka/pull/20093
Receive<OutboundAck>(_ => { });
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug fix.

}

/// <summary>
Expand All @@ -1227,56 +1233,50 @@ private void Handoff()
/// <param name="message">TBD</param>
protected override void Unhandled(object message)
{
if (message is Terminated)
switch (message)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just resharper'd this.

{
var t = message as Terminated;
if (_reader == null || t.ActorRef.Equals(_reader))
case Terminated t:
{
PublishAndThrow(new EndpointDisassociatedException("Disassociated"), LogLevel.DebugLevel);
if (_reader == null || t.ActorRef.Equals(_reader))
{
PublishAndThrow(new EndpointDisassociatedException("Disassociated"), LogLevel.DebugLevel);
}

break;
}
}
else if (message is StopReading)
{
var stop = message as StopReading;
if (_reader != null)
{
case StopReading stop when _reader != null:
_reader.Tell(stop, stop.ReplyTo);
}
else
{
break;
case StopReading stop:
// initializing, buffer and take care of it later when buffer is sent
EnqueueInBuffer(message);
EnqueueInBuffer(stop);
break;
case TakeOver takeover:
// Shutdown old reader
_handle.Disassociate("the association was replaced by a new one", _log);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleaned up the disassociation message here too, so it's more clear when a remote connection is being taken over vs. when a node disassociates due to failure.

_handle = takeover.ProtocolHandle;
takeover.ReplyTo.Tell(new TookOver(Self, _handle));
Become(Handoff);
break;
case FlushAndStop _:
_stopReason = DisassociateInfo.Shutdown;
Context.Stop(Self);
break;
case OutboundAck ack:
{
_lastAck = ack.Ack;
if (_ackDeadline.IsOverdue)
TrySendPureAck();
break;
}
}
else if (message is TakeOver)
{
var takeover = message as TakeOver;

// Shutdown old reader
_handle.Disassociate();
_handle = takeover.ProtocolHandle;
takeover.ReplyTo.Tell(new TookOver(Self, _handle));
Become(Handoff);
}
else if (message is FlushAndStop)
{
_stopReason = DisassociateInfo.Shutdown;
Context.Stop(Self);
}
else if (message is OutboundAck)
{
var ack = message as OutboundAck;
_lastAck = ack.Ack;
if (_ackDeadline.IsOverdue)
TrySendPureAck();
}
else if (message is AckIdleCheckTimer || message is FlushAndStopTimeout || message is BackoffTimer)
{
//ignore
}
else
{
base.Unhandled(message);
case AckIdleCheckTimer _:
case FlushAndStopTimeout _:
case BackoffTimer _:
//ignore
break;
default:
base.Unhandled(message);
break;
}
}

Expand Down