-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Clean up bad outbound ACKs in Akka.Remote #4963
Changes from all commits
4cd0698
8bfe974
cb746fe
63a2934
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1139,7 +1139,7 @@ protected override void PostStop() | |
} | ||
_buffer.Clear(); | ||
|
||
if (_handle != null) _handle.Disassociate(_stopReason); | ||
_handle?.Disassociate(_stopReason); | ||
EventPublisher.NotifyListeners(new DisassociatedEvent(LocalAddress, RemoteAddress, Inbound)); | ||
} | ||
|
||
|
@@ -1219,6 +1219,12 @@ private void Handoff() | |
BecomeWritingOrSendBufferedMessages(); | ||
}); | ||
Receive<EndpointManager.Send>(send => EnqueueInBuffer(send)); | ||
|
||
// Ignore outgoing acks during take over, since we might have | ||
// replaced the handle with a connection to a new, restarted, system | ||
// and the ack might be targeted to the old incarnation. | ||
// Relates to https://github.com/akka/akka/pull/20093 | ||
Receive<OutboundAck>(_ => { }); | ||
} | ||
|
||
/// <summary> | ||
|
@@ -1227,56 +1233,50 @@ private void Handoff() | |
/// <param name="message">TBD</param> | ||
protected override void Unhandled(object message) | ||
{ | ||
if (message is Terminated) | ||
switch (message) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just resharper'd this. |
||
{ | ||
var t = message as Terminated; | ||
if (_reader == null || t.ActorRef.Equals(_reader)) | ||
case Terminated t: | ||
{ | ||
PublishAndThrow(new EndpointDisassociatedException("Disassociated"), LogLevel.DebugLevel); | ||
if (_reader == null || t.ActorRef.Equals(_reader)) | ||
{ | ||
PublishAndThrow(new EndpointDisassociatedException("Disassociated"), LogLevel.DebugLevel); | ||
} | ||
|
||
break; | ||
} | ||
} | ||
else if (message is StopReading) | ||
{ | ||
var stop = message as StopReading; | ||
if (_reader != null) | ||
{ | ||
case StopReading stop when _reader != null: | ||
_reader.Tell(stop, stop.ReplyTo); | ||
} | ||
else | ||
{ | ||
break; | ||
case StopReading stop: | ||
// initializing, buffer and take care of it later when buffer is sent | ||
EnqueueInBuffer(message); | ||
EnqueueInBuffer(stop); | ||
break; | ||
case TakeOver takeover: | ||
// Shutdown old reader | ||
_handle.Disassociate("the association was replaced by a new one", _log); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Cleaned up the disassociation message here too, so it's more clear when a remote connection is being taken over vs. when a node disassociates due to failure. |
||
_handle = takeover.ProtocolHandle; | ||
takeover.ReplyTo.Tell(new TookOver(Self, _handle)); | ||
Become(Handoff); | ||
break; | ||
case FlushAndStop _: | ||
_stopReason = DisassociateInfo.Shutdown; | ||
Context.Stop(Self); | ||
break; | ||
case OutboundAck ack: | ||
{ | ||
_lastAck = ack.Ack; | ||
if (_ackDeadline.IsOverdue) | ||
TrySendPureAck(); | ||
break; | ||
} | ||
} | ||
else if (message is TakeOver) | ||
{ | ||
var takeover = message as TakeOver; | ||
|
||
// Shutdown old reader | ||
_handle.Disassociate(); | ||
_handle = takeover.ProtocolHandle; | ||
takeover.ReplyTo.Tell(new TookOver(Self, _handle)); | ||
Become(Handoff); | ||
} | ||
else if (message is FlushAndStop) | ||
{ | ||
_stopReason = DisassociateInfo.Shutdown; | ||
Context.Stop(Self); | ||
} | ||
else if (message is OutboundAck) | ||
{ | ||
var ack = message as OutboundAck; | ||
_lastAck = ack.Ack; | ||
if (_ackDeadline.IsOverdue) | ||
TrySendPureAck(); | ||
} | ||
else if (message is AckIdleCheckTimer || message is FlushAndStopTimeout || message is BackoffTimer) | ||
{ | ||
//ignore | ||
} | ||
else | ||
{ | ||
base.Unhandled(message); | ||
case AckIdleCheckTimer _: | ||
case FlushAndStopTimeout _: | ||
case BackoffTimer _: | ||
//ignore | ||
break; | ||
default: | ||
base.Unhandled(message); | ||
break; | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug fix.