Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

treat some deserialization errors as transient in remoting #3782

Merged
merged 8 commits into from
Jun 26, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Runtime.Serialization;
using Akka.Actor;
using Akka.Serialization;
using Google.Protobuf;
Expand Down Expand Up @@ -157,7 +158,7 @@ public override object FromBinary(byte[] bytes, string manifest)
if (_fromBinaryMap.TryGetValue(manifest, out var factory))
return factory(bytes);

throw new ArgumentException($"Unimplemented deserialization of message with manifest [{manifest}] in [{this.GetType()}]");
throw new SerializationException($"Unimplemented deserialization of message with manifest [{manifest}] in [{this.GetType()}]");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Runtime.Serialization;
using Akka.Actor;
using Akka.Serialization;
using Google.Protobuf;
Expand Down Expand Up @@ -76,7 +77,7 @@ public override object FromBinary(byte[] bytes, string manifest)
if (_fromBinaryMap.TryGetValue(manifest, out var deserializer))
return deserializer(bytes);

throw new ArgumentException($"Unimplemented deserialization of message with manifest [{manifest}] in serializer {nameof(ClusterClientMessageSerializer)}");
throw new SerializationException($"Unimplemented deserialization of message with manifest [{manifest}] in serializer {nameof(ClusterClientMessageSerializer)}");
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Runtime.CompilerServices;
using System.Runtime.Serialization;
using Akka.Actor;
using Akka.Cluster.Tools.PublishSubscribe.Internal;
using Akka.Remote.Serialization;
Expand Down Expand Up @@ -89,7 +90,7 @@ public override object FromBinary(byte[] bytes, string manifest)
if (_fromBinaryMap.TryGetValue(manifest, out var deserializer))
return deserializer(bytes);

throw new ArgumentException($"Unimplemented deserialization of message with manifest [{manifest}] in serializer {nameof(DistributedPubSubMessageSerializer)}");
throw new SerializationException($"Unimplemented deserialization of message with manifest [{manifest}] in serializer {nameof(DistributedPubSubMessageSerializer)}");
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

using System;
using System.Collections.Generic;
using System.Runtime.Serialization;
using Akka.Actor;
using Akka.Serialization;

Expand Down Expand Up @@ -73,7 +74,7 @@ public override object FromBinary(byte[] bytes, string manifest)
if (_fromBinaryMap.TryGetValue(manifest, out var mapper))
return mapper(bytes);

throw new ArgumentException($"Unimplemented deserialization of message with manifest [{manifest}] in [{GetType()}]");
throw new SerializationException($"Unimplemented deserialization of message with manifest [{manifest}] in [{GetType()}]");
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

using System;
using System.IO;
using System.Runtime.Serialization;
using Akka.Actor;
using Akka.Util;
using Hyperion;
Expand All @@ -17,7 +18,7 @@ namespace Akka.DistributedData.Serialization
public sealed class ReplicatedDataSerializer : Serializer
{
private readonly Hyperion.Serializer _serializer;

public ReplicatedDataSerializer(ExtendedActorSystem system) : base(system)
{
var akkaSurrogate =
Expand Down Expand Up @@ -63,10 +64,25 @@ public override byte[] ToBinary(object obj)
/// <returns>The object contained in the array</returns>
public override object FromBinary(byte[] bytes, Type type)
{
using (var ms = new MemoryStream(bytes))
try
{
using (var ms = new MemoryStream(bytes))
{
var res = _serializer.Deserialize(ms);
return res;
}
}
catch (TypeLoadException e)
{
throw new SerializationException(e.Message, e);
}
catch (NotSupportedException e)
{
throw new SerializationException(e.Message, e);
}
catch (ArgumentException e)
{
var res = _serializer.Deserialize(ms);
return res;
throw new SerializationException(e.Message, e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Runtime.Serialization;
using Akka.Actor;
using Akka.DistributedData.Internal;
using Akka.Util;
Expand Down Expand Up @@ -120,9 +121,9 @@ private TVal Get(TKey key, int startIndex)
}

#endregion

public static readonly Type WriteAckType = typeof(WriteAck);

private readonly SmallCache<Read, byte[]> readCache;
private readonly SmallCache<Write, byte[]> writeCache;
private readonly Hyperion.Serializer serializer;
Expand Down Expand Up @@ -181,7 +182,7 @@ public override byte[] ToBinary(object obj)
if (obj is Write) return writeCache.GetOrAdd((Write) obj);
if (obj is Read) return readCache.GetOrAdd((Read)obj);
if (obj is WriteAck) return writeAckBytes;

return Serialize(obj);
}

Expand All @@ -198,9 +199,24 @@ private byte[] Serialize(object obj)
public override object FromBinary(byte[] bytes, Type type)
{
if (type == WriteAckType) return WriteAck.Instance;
using (var stream = new MemoryStream(bytes))
try
{
using (var stream = new MemoryStream(bytes))
{
return serializer.Deserialize(stream);
}
}
catch (TypeLoadException e)
{
throw new SerializationException(e.Message, e);
}
catch (NotSupportedException e)
{
throw new SerializationException(e.Message, e);
}
catch (ArgumentException e)
{
return serializer.Deserialize(stream);
throw new SerializationException(e.Message, e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.IO;
using System.Linq;
using System.Reflection;
using System.Runtime.Serialization;
using Akka.Actor;
using Akka.Configuration;
using Akka.Util;
Expand All @@ -33,7 +34,7 @@ public class HyperionSerializer : Serializer
/// Initializes a new instance of the <see cref="HyperionSerializer"/> class.
/// </summary>
/// <param name="system">The actor system to associate with this serializer.</param>
public HyperionSerializer(ExtendedActorSystem system)
public HyperionSerializer(ExtendedActorSystem system)
: this(system, HyperionSerializerSettings.Default)
{
}
Expand All @@ -43,7 +44,7 @@ public HyperionSerializer(ExtendedActorSystem system)
/// </summary>
/// <param name="system">The actor system to associate with this serializer.</param>
/// <param name="config">Configuration passed from related HOCON config path.</param>
public HyperionSerializer(ExtendedActorSystem system, Config config)
public HyperionSerializer(ExtendedActorSystem system, Config config)
: this(system, HyperionSerializerSettings.Create(config))
{
}
Expand All @@ -70,7 +71,7 @@ public HyperionSerializer(ExtendedActorSystem system, HyperionSerializerSettings
preserveObjectReferences: settings.PreserveObjectReferences,
versionTolerance: settings.VersionTolerance,
surrogates: new[] { akkaSurrogate },
knownTypes: provider.GetKnownTypes(),
knownTypes: provider.GetKnownTypes(),
ignoreISerializable:true));
}

Expand Down Expand Up @@ -106,10 +107,25 @@ public override byte[] ToBinary(object obj)
/// <returns>The object contained in the array</returns>
public override object FromBinary(byte[] bytes, Type type)
{
using (var ms = new MemoryStream(bytes))
try
{
var res = _serializer.Deserialize<object>(ms);
return res;
using (var ms = new MemoryStream(bytes))
{
var res = _serializer.Deserialize<object>(ms);
return res;
}
}
catch (TypeLoadException e)
{
throw new SerializationException(e.Message, e);
}
catch(NotSupportedException e)
{
throw new SerializationException(e.Message, e);
}
catch (ArgumentException e)
{
throw new SerializationException(e.Message, e);
}
}

Expand Down Expand Up @@ -169,23 +185,23 @@ public static HyperionSerializerSettings Create(Config config)
}

/// <summary>
/// When true, it tells <see cref="HyperionSerializer"/> to keep
/// When true, it tells <see cref="HyperionSerializer"/> to keep
/// track of references in serialized/deserialized object graph.
/// </summary>
public readonly bool PreserveObjectReferences;

/// <summary>
/// When true, it tells <see cref="HyperionSerializer"/> to encode
/// When true, it tells <see cref="HyperionSerializer"/> to encode
/// a list of currently serialized fields into type manifest.
/// </summary>
public readonly bool VersionTolerance;

/// <summary>
/// A type implementing <see cref="IKnownTypesProvider"/>, that will
/// be used when <see cref="HyperionSerializer"/> is being constructed
/// to provide a list of message types that are supposed to be known
/// implicitly by all communicating parties. Implementing class must
/// provide either a default constructor or a constructor taking
/// A type implementing <see cref="IKnownTypesProvider"/>, that will
/// be used when <see cref="HyperionSerializer"/> is being constructed
/// to provide a list of message types that are supposed to be known
/// implicitly by all communicating parties. Implementing class must
/// provide either a default constructor or a constructor taking
/// <see cref="ExtendedActorSystem"/> as its only parameter.
/// </summary>
public readonly Type KnownTypesProvider;
Expand Down
15 changes: 8 additions & 7 deletions src/core/Akka.Cluster/Serialization/ClusterMessageSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using System.Collections.Immutable;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Runtime.Serialization;
using Akka.Actor;
using Akka.Cluster.Routing;
using Akka.Serialization;
Expand All @@ -26,7 +27,7 @@ public class ClusterMessageSerializer : Serializer

public ClusterMessageSerializer(ExtendedActorSystem system) : base(system)
{

_fromBinaryMap = new Dictionary<Type, Func<byte[], object>>
{
[typeof(ClusterHeartbeatSender.Heartbeat)] = bytes => new ClusterHeartbeatSender.Heartbeat(AddressFrom(AddressData.Parser.ParseFrom(bytes))),
Expand Down Expand Up @@ -87,7 +88,7 @@ public override object FromBinary(byte[] bytes, Type type)
if (_fromBinaryMap.TryGetValue(type, out var factory))
return factory(bytes);

throw new ArgumentException($"{nameof(ClusterMessageSerializer)} cannot deserialize object of type {type}");
throw new SerializationException($"{nameof(ClusterMessageSerializer)} cannot deserialize object of type {type}");
}

//
Expand Down Expand Up @@ -265,11 +266,11 @@ private static Gossip GossipFrom(Proto.Msg.Gossip gossip)
var roleMapping = gossip.AllRoles.ToList();
var hashMapping = gossip.AllHashes.ToList();

Member MemberFromProto(Proto.Msg.Member member) =>
Member MemberFromProto(Proto.Msg.Member member) =>
Member.Create(
addressMapping[member.AddressIndex],
member.UpNumber,
(MemberStatus)member.Status,
addressMapping[member.AddressIndex],
member.UpNumber,
(MemberStatus)member.Status,
member.RolesIndexes.Select(x => roleMapping[x]).ToImmutableHashSet());

var members = gossip.Members.Select((Func<Proto.Msg.Member, Member>)MemberFromProto).ToImmutableSortedSet(Member.Ordering);
Expand Down Expand Up @@ -342,7 +343,7 @@ private static Proto.Msg.VectorClock VectorClockToProto(VectorClock vectorClock,

private static VectorClock VectorClockFrom(Proto.Msg.VectorClock version, IList<string> hashMapping)
{
return VectorClock.Create(version.Versions.ToImmutableSortedDictionary(version1 =>
return VectorClock.Create(version.Versions.ToImmutableSortedDictionary(version1 =>
VectorClock.Node.FromHash(hashMapping[version1.HashIndex]), version1 => version1.Timestamp));
}

Expand Down
Loading