Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Akka.Serializer performance fixes #3532

Open
wants to merge 1 commit into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 40 additions & 19 deletions src/core/Akka/Serialization/Serialization.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using System.Runtime.CompilerServices;
using System.Reflection;
using System.Runtime.Serialization;
using System.Threading;
using Akka.Actor;
using Akka.Util.Internal;
using Akka.Util.Reflection;
Expand All @@ -21,20 +22,36 @@ namespace Akka.Serialization
/// <summary>
/// Serialization information needed for serializing local actor refs.
/// </summary>
internal class Information
internal sealed class Information
{
public Address Address { get; set; }
public Address Address { get; private set; }

public ActorSystem System { get; set; }
public ActorSystem System { get; private set; }

public bool IsSet { get; private set; }

public void Set(Address address, ActorSystem system)
{
IsSet = true;
Address = address;
System = system;
}

public void Clear()
{
IsSet = false;
Address = null;
System = null;
}
}

/// <summary>
/// TBD
/// The Akka.NET Serialization system. Can be used to lookup registered <see cref="Serializer"/>
/// implementations and serialize / deserialize messages directly.
/// </summary>
public class Serialization
{
[ThreadStatic]
private static Information _currentTransportInformation;
private static readonly ThreadLocal<Information> CurrentTransportInformation = new ThreadLocal<Information>(() => new Information(), false);

/// <summary>
/// TBD
Expand All @@ -46,13 +63,17 @@ public class Serialization
/// <returns>TBD</returns>
public static T SerializeWithTransport<T>(ActorSystem system, Address address, Func<T> action)
{
_currentTransportInformation = new Information()
{
System = system,
Address = address
};
CurrentTransportInformation.Value.Set(address, system);
var res = action();
_currentTransportInformation = null;
CurrentTransportInformation.Value.Clear();
return res;
}

public static byte[] SerializeWithTransport(ActorSystem system, Address address, Serializer s, object message)
{
CurrentTransportInformation.Value.Set(address, system);
var res = s.ToBinary(message);
CurrentTransportInformation.Value.Clear();
return res;
}

Expand Down Expand Up @@ -207,8 +228,8 @@ public object Deserialize(byte[] bytes, int serializerId, string manifest)
$"Cannot find serializer with id [{serializerId}]. The most probable reason" +
" is that the configuration entry 'akka.actor.serializers' is not in sync between the two systems.");

if (serializer is SerializerWithStringManifest)
return ((SerializerWithStringManifest)serializer).FromBinary(bytes, manifest);
if (serializer is SerializerWithStringManifest stringManifest)
return stringManifest.FromBinary(bytes, manifest);
if (string.IsNullOrEmpty(manifest))
return serializer.FromBinary(bytes, null);
Type type;
Expand Down Expand Up @@ -295,12 +316,12 @@ public static string SerializedActorPath(IActorRef actorRef)

var path = actorRef.Path;
ExtendedActorSystem originalSystem = null;
if (actorRef is ActorRefWithCell)
if (actorRef is ActorRefWithCell cell)
{
originalSystem = actorRef.AsInstanceOf<ActorRefWithCell>().Underlying.System.AsInstanceOf<ExtendedActorSystem>();
originalSystem = (ExtendedActorSystem)cell.Underlying.System;
}

if (_currentTransportInformation == null)
if (!CurrentTransportInformation.Value.IsSet)
{
if (originalSystem == null)
{
Expand All @@ -316,8 +337,8 @@ public static string SerializedActorPath(IActorRef actorRef)
}

//CurrentTransportInformation exists
var system = _currentTransportInformation.System;
var address = _currentTransportInformation.Address;
var system = CurrentTransportInformation.Value.System;
var address = CurrentTransportInformation.Value.Address;
if (originalSystem == null || originalSystem == system)
{
var res = path.ToSerializationFormatWithAddress(address);
Expand Down
4 changes: 2 additions & 2 deletions src/core/Akka/Serialization/Serializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,10 @@ protected Serializer(ExtendedActorSystem system)
/// </summary>
/// <param name="address">The address to use when serializing local ActorRef´s</param>
/// <param name="obj">The object to serialize</param>
/// <returns>TBD</returns>
/// <returns>A serialized message</returns>
public byte[] ToBinaryWithAddress(Address address, object obj)
{
return Serialization.SerializeWithTransport(system, address, () => ToBinary(obj));
return Serialization.SerializeWithTransport(system, address, this, obj);
}

/// <summary>
Expand Down