Skip to content

Commit

Permalink
catch NotSerializableException from deserialization, Migrated from #2…
Browse files Browse the repository at this point in the history
…0641

* to be able to introduce new messages and still support rolling upgrades,
  i.e. a cluster of mixed versions
* note that it's only catching NotSerializableException, which we already
  use for unknown serializer ids and class manifests
* note that it is not catching for system messages, since that could result
  in infinite resending

Do not tear down connections on IllegalArgumentException from serializer, Migrated from #24910
  • Loading branch information
zbynek001 committed May 9, 2019
1 parent bc5cc65 commit a9b670d
Show file tree
Hide file tree
Showing 11 changed files with 364 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Runtime.Serialization;
using Akka.Actor;
using Akka.Serialization;
using Google.Protobuf;
Expand Down Expand Up @@ -157,7 +158,7 @@ public override object FromBinary(byte[] bytes, string manifest)
if (_fromBinaryMap.TryGetValue(manifest, out var factory))
return factory(bytes);

throw new ArgumentException($"Unimplemented deserialization of message with manifest [{manifest}] in [{this.GetType()}]");
throw new SerializationException($"Unimplemented deserialization of message with manifest [{manifest}] in [{this.GetType()}]");
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Runtime.Serialization;
using Akka.Actor;
using Akka.Serialization;
using Google.Protobuf;
Expand Down Expand Up @@ -76,7 +77,7 @@ public override object FromBinary(byte[] bytes, string manifest)
if (_fromBinaryMap.TryGetValue(manifest, out var deserializer))
return deserializer(bytes);

throw new ArgumentException($"Unimplemented deserialization of message with manifest [{manifest}] in serializer {nameof(ClusterClientMessageSerializer)}");
throw new SerializationException($"Unimplemented deserialization of message with manifest [{manifest}] in serializer {nameof(ClusterClientMessageSerializer)}");
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Runtime.CompilerServices;
using System.Runtime.Serialization;
using Akka.Actor;
using Akka.Cluster.Tools.PublishSubscribe.Internal;
using Akka.Remote.Serialization;
Expand Down Expand Up @@ -89,7 +90,7 @@ public override object FromBinary(byte[] bytes, string manifest)
if (_fromBinaryMap.TryGetValue(manifest, out var deserializer))
return deserializer(bytes);

throw new ArgumentException($"Unimplemented deserialization of message with manifest [{manifest}] in serializer {nameof(DistributedPubSubMessageSerializer)}");
throw new SerializationException($"Unimplemented deserialization of message with manifest [{manifest}] in serializer {nameof(DistributedPubSubMessageSerializer)}");
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

using System;
using System.Collections.Generic;
using System.Runtime.Serialization;
using Akka.Actor;
using Akka.Serialization;

Expand Down Expand Up @@ -73,7 +74,7 @@ public override object FromBinary(byte[] bytes, string manifest)
if (_fromBinaryMap.TryGetValue(manifest, out var mapper))
return mapper(bytes);

throw new ArgumentException($"Unimplemented deserialization of message with manifest [{manifest}] in [{GetType()}]");
throw new SerializationException($"Unimplemented deserialization of message with manifest [{manifest}] in [{GetType()}]");
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

using System;
using System.IO;
using System.Runtime.Serialization;
using Akka.Actor;
using Akka.Util;
using Hyperion;
Expand All @@ -17,7 +18,7 @@ namespace Akka.DistributedData.Serialization
public sealed class ReplicatedDataSerializer : Serializer
{
private readonly Hyperion.Serializer _serializer;

public ReplicatedDataSerializer(ExtendedActorSystem system) : base(system)
{
var akkaSurrogate =
Expand Down Expand Up @@ -63,10 +64,25 @@ public override byte[] ToBinary(object obj)
/// <returns>The object contained in the array</returns>
public override object FromBinary(byte[] bytes, Type type)
{
using (var ms = new MemoryStream(bytes))
try
{
using (var ms = new MemoryStream(bytes))
{
var res = _serializer.Deserialize(ms);
return res;
}
}
catch (TypeLoadException e)
{
throw new SerializationException(e.Message, e);
}
catch (NotSupportedException e)
{
throw new SerializationException(e.Message, e);
}
catch (ArgumentException e)
{
var res = _serializer.Deserialize(ms);
return res;
throw new SerializationException(e.Message, e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Runtime.Serialization;
using Akka.Actor;
using Akka.DistributedData.Internal;
using Akka.Util;
Expand Down Expand Up @@ -120,9 +121,9 @@ private TVal Get(TKey key, int startIndex)
}

#endregion

public static readonly Type WriteAckType = typeof(WriteAck);

private readonly SmallCache<Read, byte[]> readCache;
private readonly SmallCache<Write, byte[]> writeCache;
private readonly Hyperion.Serializer serializer;
Expand Down Expand Up @@ -181,7 +182,7 @@ public override byte[] ToBinary(object obj)
if (obj is Write) return writeCache.GetOrAdd((Write) obj);
if (obj is Read) return readCache.GetOrAdd((Read)obj);
if (obj is WriteAck) return writeAckBytes;

return Serialize(obj);
}

Expand All @@ -198,9 +199,24 @@ private byte[] Serialize(object obj)
public override object FromBinary(byte[] bytes, Type type)
{
if (type == WriteAckType) return WriteAck.Instance;
using (var stream = new MemoryStream(bytes))
try
{
using (var stream = new MemoryStream(bytes))
{
return serializer.Deserialize(stream);
}
}
catch (TypeLoadException e)
{
throw new SerializationException(e.Message, e);
}
catch (NotSupportedException e)
{
throw new SerializationException(e.Message, e);
}
catch (ArgumentException e)
{
return serializer.Deserialize(stream);
throw new SerializationException(e.Message, e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.IO;
using System.Linq;
using System.Reflection;
using System.Runtime.Serialization;
using Akka.Actor;
using Akka.Configuration;
using Akka.Util;
Expand All @@ -33,7 +34,7 @@ public class HyperionSerializer : Serializer
/// Initializes a new instance of the <see cref="HyperionSerializer"/> class.
/// </summary>
/// <param name="system">The actor system to associate with this serializer.</param>
public HyperionSerializer(ExtendedActorSystem system)
public HyperionSerializer(ExtendedActorSystem system)
: this(system, HyperionSerializerSettings.Default)
{
}
Expand All @@ -43,7 +44,7 @@ public HyperionSerializer(ExtendedActorSystem system)
/// </summary>
/// <param name="system">The actor system to associate with this serializer.</param>
/// <param name="config">Configuration passed from related HOCON config path.</param>
public HyperionSerializer(ExtendedActorSystem system, Config config)
public HyperionSerializer(ExtendedActorSystem system, Config config)
: this(system, HyperionSerializerSettings.Create(config))
{
}
Expand All @@ -70,7 +71,7 @@ public HyperionSerializer(ExtendedActorSystem system, HyperionSerializerSettings
preserveObjectReferences: settings.PreserveObjectReferences,
versionTolerance: settings.VersionTolerance,
surrogates: new[] { akkaSurrogate },
knownTypes: provider.GetKnownTypes(),
knownTypes: provider.GetKnownTypes(),
ignoreISerializable:true));
}

Expand Down Expand Up @@ -106,10 +107,25 @@ public override byte[] ToBinary(object obj)
/// <returns>The object contained in the array</returns>
public override object FromBinary(byte[] bytes, Type type)
{
using (var ms = new MemoryStream(bytes))
try
{
var res = _serializer.Deserialize<object>(ms);
return res;
using (var ms = new MemoryStream(bytes))
{
var res = _serializer.Deserialize<object>(ms);
return res;
}
}
catch (TypeLoadException e)
{
throw new SerializationException(e.Message, e);
}
catch(NotSupportedException e)
{
throw new SerializationException(e.Message, e);
}
catch (ArgumentException e)
{
throw new SerializationException(e.Message, e);
}
}

Expand Down Expand Up @@ -169,23 +185,23 @@ public static HyperionSerializerSettings Create(Config config)
}

/// <summary>
/// When true, it tells <see cref="HyperionSerializer"/> to keep
/// When true, it tells <see cref="HyperionSerializer"/> to keep
/// track of references in serialized/deserialized object graph.
/// </summary>
public readonly bool PreserveObjectReferences;

/// <summary>
/// When true, it tells <see cref="HyperionSerializer"/> to encode
/// When true, it tells <see cref="HyperionSerializer"/> to encode
/// a list of currently serialized fields into type manifest.
/// </summary>
public readonly bool VersionTolerance;

/// <summary>
/// A type implementing <see cref="IKnownTypesProvider"/>, that will
/// be used when <see cref="HyperionSerializer"/> is being constructed
/// to provide a list of message types that are supposed to be known
/// implicitly by all communicating parties. Implementing class must
/// provide either a default constructor or a constructor taking
/// A type implementing <see cref="IKnownTypesProvider"/>, that will
/// be used when <see cref="HyperionSerializer"/> is being constructed
/// to provide a list of message types that are supposed to be known
/// implicitly by all communicating parties. Implementing class must
/// provide either a default constructor or a constructor taking
/// <see cref="ExtendedActorSystem"/> as its only parameter.
/// </summary>
public readonly Type KnownTypesProvider;
Expand Down
15 changes: 8 additions & 7 deletions src/core/Akka.Cluster/Serialization/ClusterMessageSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using System.Collections.Immutable;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Runtime.Serialization;
using Akka.Actor;
using Akka.Cluster.Routing;
using Akka.Serialization;
Expand All @@ -26,7 +27,7 @@ public class ClusterMessageSerializer : Serializer

public ClusterMessageSerializer(ExtendedActorSystem system) : base(system)
{

_fromBinaryMap = new Dictionary<Type, Func<byte[], object>>
{
[typeof(ClusterHeartbeatSender.Heartbeat)] = bytes => new ClusterHeartbeatSender.Heartbeat(AddressFrom(AddressData.Parser.ParseFrom(bytes))),
Expand Down Expand Up @@ -87,7 +88,7 @@ public override object FromBinary(byte[] bytes, Type type)
if (_fromBinaryMap.TryGetValue(type, out var factory))
return factory(bytes);

throw new ArgumentException($"{nameof(ClusterMessageSerializer)} cannot deserialize object of type {type}");
throw new SerializationException($"{nameof(ClusterMessageSerializer)} cannot deserialize object of type {type}");
}

//
Expand Down Expand Up @@ -265,11 +266,11 @@ private static Gossip GossipFrom(Proto.Msg.Gossip gossip)
var roleMapping = gossip.AllRoles.ToList();
var hashMapping = gossip.AllHashes.ToList();

Member MemberFromProto(Proto.Msg.Member member) =>
Member MemberFromProto(Proto.Msg.Member member) =>
Member.Create(
addressMapping[member.AddressIndex],
member.UpNumber,
(MemberStatus)member.Status,
addressMapping[member.AddressIndex],
member.UpNumber,
(MemberStatus)member.Status,
member.RolesIndexes.Select(x => roleMapping[x]).ToImmutableHashSet());

var members = gossip.Members.Select((Func<Proto.Msg.Member, Member>)MemberFromProto).ToImmutableSortedSet(Member.Ordering);
Expand Down Expand Up @@ -342,7 +343,7 @@ private static Proto.Msg.VectorClock VectorClockToProto(VectorClock vectorClock,

private static VectorClock VectorClockFrom(Proto.Msg.VectorClock version, IList<string> hashMapping)
{
return VectorClock.Create(version.Versions.ToImmutableSortedDictionary(version1 =>
return VectorClock.Create(version.Versions.ToImmutableSortedDictionary(version1 =>
VectorClock.Node.FromHash(hashMapping[version1.HashIndex]), version1 => version1.Timestamp));
}

Expand Down
Loading

0 comments on commit a9b670d

Please sign in to comment.