Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix serialization verification problem with Akka.IO messages #4974

8 changes: 4 additions & 4 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3256,7 +3256,7 @@ namespace Akka.IO
public static Akka.IO.Dns.Resolved Cached(string name, Akka.Actor.ActorSystem system) { }
public override Akka.IO.DnsExt CreateExtension(Akka.Actor.ExtendedActorSystem system) { }
public static Akka.IO.Dns.Resolved ResolveName(string name, Akka.Actor.ActorSystem system, Akka.Actor.IActorRef sender) { }
public abstract class Command
public abstract class Command : Akka.Actor.INoSerializationVerificationNeeded
{
protected Command() { }
}
Expand Down Expand Up @@ -3685,7 +3685,7 @@ namespace Akka.IO
{
protected Event() { }
}
public abstract class Message
public abstract class Message : Akka.Actor.INoSerializationVerificationNeeded
{
protected Message() { }
}
Expand Down Expand Up @@ -3792,7 +3792,7 @@ namespace Akka.IO
{
protected Event() { }
}
public abstract class Message
public abstract class Message : Akka.Actor.INoSerializationVerificationNeeded
{
protected Message() { }
}
Expand Down Expand Up @@ -3827,7 +3827,7 @@ namespace Akka.IO
public static readonly Akka.IO.UdpConnected.SuspendReading Instance;
}
}
public class UdpConnectedExt : Akka.IO.IOExtension
public class UdpConnectedExt : Akka.IO.IOExtension, Akka.Actor.INoSerializationVerificationNeeded
{
public UdpConnectedExt(Akka.Actor.ExtendedActorSystem system) { }
public UdpConnectedExt(Akka.Actor.ExtendedActorSystem system, Akka.IO.UdpSettings settings) { }
Expand Down
4 changes: 3 additions & 1 deletion src/core/Akka.Tests/IO/TcpIntegrationSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class AckWithValue : Tcp.Event

public TcpIntegrationSpec(ITestOutputHelper output)
: base($@"akka.loglevel = DEBUG
akka.actor.serialize-creators = on
akka.actor.serialize-messages = on
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving this here so we can test similar problems in the future

akka.io.tcp.trace-logging = true
akka.io.tcp.write-commands-queue-max-size = {InternalConnectionActorMaxQueueSize}", output: output)
{ }
Expand Down Expand Up @@ -190,7 +192,7 @@ public void The_TCP_transport_implementation_should_properly_support_connecting_
var targetAddress = new DnsEndPoint("localhost", boundMsg.LocalAddress.AsInstanceOf<IPEndPoint>().Port);
var clientHandler = CreateTestProbe();
Sys.Tcp().Tell(new Tcp.Connect(targetAddress), clientHandler);
clientHandler.ExpectMsg<Tcp.Connected>(TimeSpan.FromMinutes(10));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A socket shouldn't take 10 minutes to connect

clientHandler.ExpectMsg<Tcp.Connected>(TimeSpan.FromSeconds(3));
var clientEp = clientHandler.Sender;
clientEp.Tell(new Tcp.Register(clientHandler));
serverHandler.ExpectMsg<Tcp.Connected>();
Expand Down
8 changes: 5 additions & 3 deletions src/core/Akka.Tests/IO/TcpListenerSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ namespace Akka.Tests.IO
public class TcpListenerSpec : AkkaSpec
{
public TcpListenerSpec()
: base(@"akka.io.tcp.register-timeout = 500ms
: base(@"
akka.actor.serialize-creators = on
akka.actor.serialize-messages = on
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving this here so we can test similar problems in the future

akka.io.tcp.register-timeout = 500ms
akka.io.tcp.max-received-message-size = 1024
akka.io.tcp.direct-buffer-size = 512
akka.actor.serialize-creators = on
akka.io.tcp.batch-accept-limit = 2
")
akka.io.tcp.batch-accept-limit = 2")
{ }

[Fact]
Expand Down
2 changes: 2 additions & 0 deletions src/core/Akka.Tests/IO/UdpConnectedIntegrationSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ public class UdpConnectedIntegrationSpec : AkkaSpec

public UdpConnectedIntegrationSpec(ITestOutputHelper output)
: base(@"
akka.actor.serialize-creators = on
akka.actor.serialize-messages = on
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving this here so we can test similar problems in the future

akka.io.udp-connected.nr-of-selectors = 1
akka.io.udp-connected.direct-buffer-pool-limit = 100
akka.io.udp-connected.direct-buffer-size = 1024
Expand Down
2 changes: 2 additions & 0 deletions src/core/Akka.Tests/IO/UdpIntegrationSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public class UdpIntegrationSpec : AkkaSpec

public UdpIntegrationSpec(ITestOutputHelper output)
: base(@"
akka.actor.serialize-creators = on
akka.actor.serialize-messages = on
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving this here so we can test similar problems in the future

akka.io.udp.max-channels = unlimited
akka.io.udp.nr-of-selectors = 1
akka.io.udp.direct-buffer-pool-limit = 100
Expand Down
10 changes: 7 additions & 3 deletions src/core/Akka.Tests/IO/UdpListenerSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,21 @@
using Akka.IO;
using Akka.TestKit;
using Xunit;
using Xunit.Abstractions;
using UdpListener = Akka.IO.UdpListener;

namespace Akka.Tests.IO
{
public class UdpListenerSpec : AkkaSpec
{
public UdpListenerSpec()
: base(@"akka.io.udp.max-channels = unlimited
public UdpListenerSpec(ITestOutputHelper output)
: base(@"
akka.actor.serialize-creators = on
akka.actor.serialize-messages = on
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving this here so we can test similar problems in the future

akka.io.udp.max-channels = unlimited
akka.io.udp.nr-of-selectors = 1
akka.io.udp.direct-buffer-pool-limit = 100
akka.io.udp.direct-buffer-size = 1024")
akka.io.udp.direct-buffer-size = 1024", output)
{ }

[Fact]
Expand Down
16 changes: 13 additions & 3 deletions src/core/Akka/Actor/ActorCell.Children.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
using System.Collections.Immutable;
using System.Text;
using System.Runtime.CompilerServices;
using System.Runtime.Serialization;
using System.Threading;
using Akka.Actor.Internal;
using Akka.Dispatch.SysMsg;
using Akka.Serialization;
using Akka.Util;
using Akka.Util.Internal;
Expand Down Expand Up @@ -452,6 +454,7 @@ private IInternalActorRef MakeChild(Props props, string name, bool async, bool s
if (_systemImpl.Settings.SerializeAllCreators && !systemService && !(props.Deploy.Scope is LocalScope))
{
var oldInfo = Serialization.Serialization.CurrentTransportInformation;
object propArgument = null;
try
{
if (oldInfo == null)
Expand All @@ -465,17 +468,24 @@ private IInternalActorRef MakeChild(Props props, string name, bool async, bool s
{
if (argument != null && !(argument is INoSerializationVerificationNeeded))
{
propArgument = argument;
var serializer = ser.FindSerializerFor(argument);
var bytes = serializer.ToBinary(argument);
var ms = Serialization.Serialization.ManifestFor(serializer, argument);
if(ser.Deserialize(bytes, serializer.Identifier, ms) == null)
if (ser.Deserialize(bytes, serializer.Identifier, ms) == null)
throw new ArgumentException(
$"Pre-creation serialization check failed at [${_self.Path}/{name}]",
nameof(name));
$"Pre-creation serialization check failed at [${_self.Path}/{name}]",
nameof(name));
}
}
}
}
catch (Exception e)
{
throw new SerializationException(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Catch and re-throw to add a more meaningful exception message to the error

$"Failed to serialize and deserialize actor props argument of type {propArgument?.GetType()} for actor type [{props.Type}].",
e);
}
finally
{
Serialization.Serialization.CurrentTransportInformation = oldInfo;
Expand Down
12 changes: 11 additions & 1 deletion src/core/Akka/Actor/ActorCell.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
using Akka.Dispatch.SysMsg;
using Akka.Event;
using System.Reflection;
using System.Runtime.Serialization;
using Akka.Serialization;
using Akka.Util;
using Assert = System.Diagnostics.Debug;
Expand Down Expand Up @@ -518,7 +519,16 @@ private Envelope SerializeAndDeserialize(Envelope envelope)
if (unwrapped is INoSerializationVerificationNeeded)
return envelope;

var deserializedMsg = SerializeAndDeserializePayload(unwrapped);
object deserializedMsg;
try
{
deserializedMsg = SerializeAndDeserializePayload(unwrapped);
}
catch (Exception e)
{
throw new SerializationException($"Failed to serialize and deserialize payload object [{unwrapped.GetType()}]. Envelope: [{envelope}], Actor type: [{Actor.GetType()}]", e);
}

if (deadLetter != null)
return new Envelope(new DeadLetter(deserializedMsg, deadLetter.Sender, deadLetter.Recipient), envelope.Sender);
return new Envelope(deserializedMsg, envelope.Sender);
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka/IO/Dns.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class Dns : ExtensionIdProvider<DnsExt>
/// <summary>
/// TBD
/// </summary>
public abstract class Command
public abstract class Command : INoSerializationVerificationNeeded
{ }

/// <summary>
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka/IO/Tcp.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public override TcpExt CreateExtension(ExtendedActorSystem system)

#region internal connection messages

internal abstract class SocketCompleted { }
internal abstract class SocketCompleted : INoSerializationVerificationNeeded { }

internal sealed class SocketSent : SocketCompleted
{
Expand Down
64 changes: 37 additions & 27 deletions src/core/Akka/IO/TcpListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ private IEnumerable<SocketAsyncEventArgs> Accept(int limit)
{
var self = Self;
var saea = new SocketAsyncEventArgs();
saea.Completed += (s, e) => self.Tell(e);
saea.Completed += (s, e) => self.Tell(new SocketEvent(e));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrap naked SocketAsyncEventArgs in a struct

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

if (!_socket.AcceptAsync(saea))
Self.Tell(saea);
Self.Tell(new SocketEvent(saea));
yield return saea;
}
}
Expand All @@ -85,34 +85,34 @@ protected override SupervisorStrategy SupervisorStrategy()

protected override bool Receive(object message)
{
if (message is SocketAsyncEventArgs)
switch (message)
{
var saea = message as SocketAsyncEventArgs;
if (saea.SocketError == SocketError.Success)
Context.ActorOf(Props.Create<TcpIncomingConnection>(_tcp, saea.AcceptSocket, _bind.Handler, _bind.Options, _bind.PullMode).WithDeploy(Deploy.Local));
saea.AcceptSocket = null;
case SocketEvent evt:
var saea = evt.Args;
if (saea.SocketError == SocketError.Success)
Context.ActorOf(Props.Create<TcpIncomingConnection>(_tcp, saea.AcceptSocket, _bind.Handler, _bind.Options, _bind.PullMode).WithDeploy(Deploy.Local));
saea.AcceptSocket = null;

if (!_socket.AcceptAsync(saea))
Self.Tell(saea);
return true;
}
var resumeAccepting = message as Tcp.ResumeAccepting;
if (resumeAccepting != null)
{
_acceptLimit = resumeAccepting.BatchSize;
_saeas = Accept(_acceptLimit).ToArray();
return true;
}
if (message is Tcp.Unbind)
{
_log.Debug("Unbinding endpoint {0}", _bind.LocalAddress);
_socket.Dispose();
Sender.Tell(Tcp.Unbound.Instance);
_log.Debug("Unbound endpoint {0}, stopping listener", _bind.LocalAddress);
Context.Stop(Self);
return true;
if (!_socket.AcceptAsync(saea))
Self.Tell(new SocketEvent(saea));
return true;

case Tcp.ResumeAccepting resumeAccepting:
_acceptLimit = resumeAccepting.BatchSize;
_saeas = Accept(_acceptLimit).ToArray();
return true;

case Tcp.Unbind _:
_log.Debug("Unbinding endpoint {0}", _bind.LocalAddress);
_socket.Dispose();
Sender.Tell(Tcp.Unbound.Instance);
_log.Debug("Unbound endpoint {0}, stopping listener", _bind.LocalAddress);
Context.Stop(Self);
return true;

default:
return false;
}
return false;
}

/// <summary>
Expand All @@ -130,5 +130,15 @@ protected override void PostStop()
_log.Debug("Error closing ServerSocketChannel: {0}", e);
}
}

private readonly struct SocketEvent : INoSerializationVerificationNeeded
{
public readonly SocketAsyncEventArgs Args;

public SocketEvent(SocketAsyncEventArgs args)
{
Args = args;
}
}
}
}
4 changes: 2 additions & 2 deletions src/core/Akka/IO/Udp.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class Udp : ExtensionIdProvider<UdpExt>
{
#region internal connection messages

internal abstract class SocketCompleted { }
internal abstract class SocketCompleted : INoSerializationVerificationNeeded { }

internal sealed class SocketSent : SocketCompleted
{
Expand Down Expand Up @@ -104,7 +104,7 @@ public override UdpExt CreateExtension(ExtendedActorSystem system)
}

/// <summary>The common interface for <see cref="Command"/> and <see cref="Event"/>.</summary>
public abstract class Message { }
public abstract class Message : INoSerializationVerificationNeeded { }

/// <summary>The common type of all commands supported by the UDP implementation.</summary>
public abstract class Command : Message
Expand Down
6 changes: 3 additions & 3 deletions src/core/Akka/IO/UdpConnected.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class UdpConnected : ExtensionIdProvider<UdpConnectedExt>
{
#region internal connection messages

internal abstract class SocketCompleted
internal abstract class SocketCompleted : INoSerializationVerificationNeeded
{
public readonly SocketAsyncEventArgs EventArgs;

Expand Down Expand Up @@ -92,7 +92,7 @@ public override UdpConnectedExt CreateExtension(ExtendedActorSystem system)
/// <summary>
/// The common interface for <see cref="Command"/> and <see cref="Event"/>.
/// </summary>
public abstract class Message { }
public abstract class Message : INoSerializationVerificationNeeded { }

/// <summary>
/// The common type of all commands supported by the UDP implementation.
Expand Down Expand Up @@ -372,7 +372,7 @@ private Disconnected()
/// <summary>
/// TBD
/// </summary>
public class UdpConnectedExt : IOExtension
public class UdpConnectedExt : IOExtension, INoSerializationVerificationNeeded
{
public UdpConnectedExt(ExtendedActorSystem system)
: this(system, UdpSettings.Create(system.Settings.Config.GetConfig("akka.io.udp-connected")))
Expand Down