Skip to content

Commit

Permalink
Clean up bad outbound ACKs in Akka.Remote (#4963)
Browse files Browse the repository at this point in the history
port of akka/akka#20093

Might be responsible for some quarantines in Akka.Cluster / Akka.Remote when nodes are restarting on identical addresses.
  • Loading branch information
Aaronontheweb authored Apr 21, 2021
1 parent d06eb36 commit c4c6443
Showing 1 changed file with 45 additions and 45 deletions.
90 changes: 45 additions & 45 deletions src/core/Akka.Remote/Endpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1139,7 +1139,7 @@ protected override void PostStop()
}
_buffer.Clear();

if (_handle != null) _handle.Disassociate(_stopReason);
_handle?.Disassociate(_stopReason);
EventPublisher.NotifyListeners(new DisassociatedEvent(LocalAddress, RemoteAddress, Inbound));
}

Expand Down Expand Up @@ -1219,6 +1219,12 @@ private void Handoff()
BecomeWritingOrSendBufferedMessages();
});
Receive<EndpointManager.Send>(send => EnqueueInBuffer(send));

// Ignore outgoing acks during take over, since we might have
// replaced the handle with a connection to a new, restarted, system
// and the ack might be targeted to the old incarnation.
// Relates to https://github.com/akka/akka/pull/20093
Receive<OutboundAck>(_ => { });
}

/// <summary>
Expand All @@ -1227,56 +1233,50 @@ private void Handoff()
/// <param name="message">TBD</param>
protected override void Unhandled(object message)
{
if (message is Terminated)
switch (message)
{
var t = message as Terminated;
if (_reader == null || t.ActorRef.Equals(_reader))
case Terminated t:
{
PublishAndThrow(new EndpointDisassociatedException("Disassociated"), LogLevel.DebugLevel);
if (_reader == null || t.ActorRef.Equals(_reader))
{
PublishAndThrow(new EndpointDisassociatedException("Disassociated"), LogLevel.DebugLevel);
}

break;
}
}
else if (message is StopReading)
{
var stop = message as StopReading;
if (_reader != null)
{
case StopReading stop when _reader != null:
_reader.Tell(stop, stop.ReplyTo);
}
else
{
break;
case StopReading stop:
// initializing, buffer and take care of it later when buffer is sent
EnqueueInBuffer(message);
EnqueueInBuffer(stop);
break;
case TakeOver takeover:
// Shutdown old reader
_handle.Disassociate("the association was replaced by a new one", _log);
_handle = takeover.ProtocolHandle;
takeover.ReplyTo.Tell(new TookOver(Self, _handle));
Become(Handoff);
break;
case FlushAndStop _:
_stopReason = DisassociateInfo.Shutdown;
Context.Stop(Self);
break;
case OutboundAck ack:
{
_lastAck = ack.Ack;
if (_ackDeadline.IsOverdue)
TrySendPureAck();
break;
}
}
else if (message is TakeOver)
{
var takeover = message as TakeOver;

// Shutdown old reader
_handle.Disassociate();
_handle = takeover.ProtocolHandle;
takeover.ReplyTo.Tell(new TookOver(Self, _handle));
Become(Handoff);
}
else if (message is FlushAndStop)
{
_stopReason = DisassociateInfo.Shutdown;
Context.Stop(Self);
}
else if (message is OutboundAck)
{
var ack = message as OutboundAck;
_lastAck = ack.Ack;
if (_ackDeadline.IsOverdue)
TrySendPureAck();
}
else if (message is AckIdleCheckTimer || message is FlushAndStopTimeout || message is BackoffTimer)
{
//ignore
}
else
{
base.Unhandled(message);
case AckIdleCheckTimer _:
case FlushAndStopTimeout _:
case BackoffTimer _:
//ignore
break;
default:
base.Unhandled(message);
break;
}
}

Expand Down

0 comments on commit c4c6443

Please sign in to comment.