diff --git a/src/core/Akka.Remote/Endpoint.cs b/src/core/Akka.Remote/Endpoint.cs index 686ff486319..77ee4275174 100644 --- a/src/core/Akka.Remote/Endpoint.cs +++ b/src/core/Akka.Remote/Endpoint.cs @@ -1139,7 +1139,7 @@ protected override void PostStop() } _buffer.Clear(); - if (_handle != null) _handle.Disassociate(_stopReason); + _handle?.Disassociate(_stopReason); EventPublisher.NotifyListeners(new DisassociatedEvent(LocalAddress, RemoteAddress, Inbound)); } @@ -1219,6 +1219,12 @@ private void Handoff() BecomeWritingOrSendBufferedMessages(); }); Receive(send => EnqueueInBuffer(send)); + + // Ignore outgoing acks during take over, since we might have + // replaced the handle with a connection to a new, restarted, system + // and the ack might be targeted to the old incarnation. + // Relates to https://github.com/akka/akka/pull/20093 + Receive(_ => { }); } /// @@ -1227,56 +1233,50 @@ private void Handoff() /// TBD protected override void Unhandled(object message) { - if (message is Terminated) + switch (message) { - var t = message as Terminated; - if (_reader == null || t.ActorRef.Equals(_reader)) + case Terminated t: { - PublishAndThrow(new EndpointDisassociatedException("Disassociated"), LogLevel.DebugLevel); + if (_reader == null || t.ActorRef.Equals(_reader)) + { + PublishAndThrow(new EndpointDisassociatedException("Disassociated"), LogLevel.DebugLevel); + } + + break; } - } - else if (message is StopReading) - { - var stop = message as StopReading; - if (_reader != null) - { + case StopReading stop when _reader != null: _reader.Tell(stop, stop.ReplyTo); - } - else - { + break; + case StopReading stop: // initializing, buffer and take care of it later when buffer is sent - EnqueueInBuffer(message); + EnqueueInBuffer(stop); + break; + case TakeOver takeover: + // Shutdown old reader + _handle.Disassociate("the association was replaced by a new one", _log); + _handle = takeover.ProtocolHandle; + takeover.ReplyTo.Tell(new TookOver(Self, _handle)); + Become(Handoff); + break; + case FlushAndStop _: + _stopReason = DisassociateInfo.Shutdown; + Context.Stop(Self); + break; + case OutboundAck ack: + { + _lastAck = ack.Ack; + if (_ackDeadline.IsOverdue) + TrySendPureAck(); + break; } - } - else if (message is TakeOver) - { - var takeover = message as TakeOver; - - // Shutdown old reader - _handle.Disassociate(); - _handle = takeover.ProtocolHandle; - takeover.ReplyTo.Tell(new TookOver(Self, _handle)); - Become(Handoff); - } - else if (message is FlushAndStop) - { - _stopReason = DisassociateInfo.Shutdown; - Context.Stop(Self); - } - else if (message is OutboundAck) - { - var ack = message as OutboundAck; - _lastAck = ack.Ack; - if (_ackDeadline.IsOverdue) - TrySendPureAck(); - } - else if (message is AckIdleCheckTimer || message is FlushAndStopTimeout || message is BackoffTimer) - { - //ignore - } - else - { - base.Unhandled(message); + case AckIdleCheckTimer _: + case FlushAndStopTimeout _: + case BackoffTimer _: + //ignore + break; + default: + base.Unhandled(message); + break; } }