Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persisence serializer setting #2933

Merged
merged 13 commits into from
Aug 11, 2017
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,11 @@ public abstract class BatchingSqlJournalSetup
public QueryConfiguration NamingConventions { get; }

/// <summary>
/// The default serializer used when not type override matching is found
/// </summary>
public string DefaultSerializer { get; }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me


/// <summary>
/// Initializes a new instance of the <see cref="BatchingSqlJournalSetup" /> class.
/// </summary>
/// <param name="config">The configuration used to configure the journal.</param>
Expand Down Expand Up @@ -281,6 +286,7 @@ protected BatchingSqlJournalSetup(Config config, QueryConfiguration namingConven
CircuitBreakerSettings = new CircuitBreakerSettings(config.GetConfig("circuit-breaker"));
ReplayFilterSettings = new ReplayFilterSettings(config.GetConfig("replay-filter"));
NamingConventions = namingConventions;
DefaultSerializer = config.GetString("serializer");
}

/// <summary>
Expand All @@ -302,7 +308,8 @@ protected BatchingSqlJournalSetup(Config config, QueryConfiguration namingConven
/// </param>
/// <param name="replayFilterSettings">The settings used when replaying events from database back to the persistent actors.</param>
/// <param name="namingConventions">The naming conventions used by the database to construct valid SQL statements.</param>
protected BatchingSqlJournalSetup(string connectionString, int maxConcurrentOperations, int maxBatchSize, int maxBufferSize, bool autoInitialize, TimeSpan connectionTimeout, IsolationLevel isolationLevel, CircuitBreakerSettings circuitBreakerSettings, ReplayFilterSettings replayFilterSettings, QueryConfiguration namingConventions)
/// <param name="defaultSerializer">The serializer used when no specific type matching can be found.</param>
protected BatchingSqlJournalSetup(string connectionString, int maxConcurrentOperations, int maxBatchSize, int maxBufferSize, bool autoInitialize, TimeSpan connectionTimeout, IsolationLevel isolationLevel, CircuitBreakerSettings circuitBreakerSettings, ReplayFilterSettings replayFilterSettings, QueryConfiguration namingConventions, string defaultSerializer)
{
ConnectionString = connectionString;
MaxConcurrentOperations = maxConcurrentOperations;
Expand All @@ -314,6 +321,7 @@ protected BatchingSqlJournalSetup(string connectionString, int maxConcurrentOper
CircuitBreakerSettings = circuitBreakerSettings;
ReplayFilterSettings = replayFilterSettings;
NamingConventions = namingConventions;
DefaultSerializer = defaultSerializer;
}
}

Expand Down Expand Up @@ -1148,7 +1156,7 @@ protected virtual void WriteEvent(TCommand command, IPersistentRepresentation pe
var manifest = string.IsNullOrEmpty(persistent.Manifest)
? payloadType.TypeQualifiedName()
: persistent.Manifest;
var serializer = _serialization.FindSerializerForType(payloadType);
var serializer = _serialization.FindSerializerForType(payloadType, Setup.DefaultSerializer);
var binary = serializer.ToBinary(persistent.Payload);

AddParameter(command, "@PersistenceId", DbType.String, persistent.PersistenceId);
Expand All @@ -1174,7 +1182,7 @@ protected virtual IPersistentRepresentation ReadEvent(DbDataReader reader)
var payload = reader[PayloadIndex];

var type = Type.GetType(manifest, true);
var deserializer = _serialization.FindSerializerForType(type);
var deserializer = _serialization.FindSerializerForType(type, Setup.DefaultSerializer);
var deserialized = deserializer.FromBinary((byte[])payload, type);

var persistent = new Persistent(deserialized, sequenceNr, persistenceId, manifest, isDeleted, ActorRefs.NoSender, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,11 @@ public class QueryConfiguration
/// </summary>
public readonly TimeSpan Timeout;

/// <summary>
/// The default serializer used when not type override matching is found
/// </summary>
public string DefaultSerializer { get; }

/// <summary>
/// TBD
/// </summary>
Expand All @@ -177,6 +182,7 @@ public class QueryConfiguration
/// <param name="tagsColumnName">TBD</param>
/// <param name="orderingColumnName">TBD</param>
/// <param name="timeout">TBD</param>
/// <param name="defaultSerializer">The default serializer used when not type override matching is found</param>
public QueryConfiguration(
string schemaName,
string journalEventsTableName,
Expand All @@ -189,7 +195,8 @@ public QueryConfiguration(
string isDeletedColumnName,
string tagsColumnName,
string orderingColumnName,
TimeSpan timeout)
TimeSpan timeout,
string defaultSerializer)
{
SchemaName = schemaName;
JournalEventsTableName = journalEventsTableName;
Expand All @@ -203,6 +210,7 @@ public QueryConfiguration(
Timeout = timeout;
TagsColumnName = tagsColumnName;
OrderingColumnName = orderingColumnName;
DefaultSerializer = defaultSerializer;
}

/// <summary>
Expand Down Expand Up @@ -658,7 +666,7 @@ protected virtual IPersistentRepresentation ReadEvent(DbDataReader reader)
var payload = reader[PayloadIndex];

var type = Type.GetType(manifest, true);
var deserializer = Serialization.FindSerializerForType(type);
var deserializer = Serialization.FindSerializerForType(type, Configuration.DefaultSerializer);
var deserialized = deserializer.FromBinary((byte[])payload, type);

return new Persistent(deserialized, sequenceNr, persistenceId, manifest, isDeleted, ActorRefs.NoSender, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ public class SnapshotStoreSettings
/// </summary>
public bool AutoInitialize { get; private set; }

/// <summary>
/// The default serializer being used if no type match override is specified
/// </summary>
public string DefaultSerializer { get; private set; }

/// <summary>
/// Initializes a new instance of the <see cref="SnapshotStoreSettings"/> class.
/// </summary>
Expand All @@ -130,6 +135,7 @@ public SnapshotStoreSettings(Config config)
SchemaName = config.GetString("schema-name");
TableName = config.GetString("table-name");
AutoInitialize = config.GetBoolean("auto-initialize");
DefaultSerializer = config.GetString("serializer");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if this setting is not specified by the plugin explicitly, it should still be available. See changes in persistence.conf

}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ public class QueryConfiguration
/// </summary>
public readonly TimeSpan Timeout;

/// <summary>
/// The default serializer used when not type override matching is found
/// </summary>
public readonly string DefaultSerializer;

/// <summary>
/// TBD
/// </summary>
Expand All @@ -113,6 +118,7 @@ public class QueryConfiguration
/// <param name="manifestColumnName">TBD</param>
/// <param name="timestampColumnName">TBD</param>
/// <param name="timeout">TBD</param>
/// <param name="defaultSerializer">The default serializer used when not type override matching is found</param>
public QueryConfiguration(
string schemaName,
string snapshotTableName,
Expand All @@ -121,7 +127,8 @@ public QueryConfiguration(
string payloadColumnName,
string manifestColumnName,
string timestampColumnName,
TimeSpan timeout)
TimeSpan timeout,
string defaultSerializer)
{
SchemaName = schemaName;
SnapshotTableName = snapshotTableName;
Expand All @@ -131,6 +138,7 @@ public QueryConfiguration(
ManifestColumnName = manifestColumnName;
TimestampColumnName = timestampColumnName;
Timeout = timeout;
DefaultSerializer = defaultSerializer;
}

/// <summary>
Expand Down Expand Up @@ -309,7 +317,7 @@ DELETE FROM {Configuration.FullSnapshotTableName}
protected virtual void SetPayloadParameter(object snapshot, DbCommand command)
{
var snapshotType = snapshot.GetType();
var serializer = Serialization.FindSerializerForType(snapshotType);
var serializer = Serialization.FindSerializerForType(snapshotType, Configuration.DefaultSerializer);

var binary = serializer.ToBinary(snapshot);
AddParameter(command, "@Payload", DbType.Binary, binary);
Expand Down Expand Up @@ -521,7 +529,7 @@ protected virtual SelectedSnapshot ReadSnapshot(DbDataReader reader)
protected object GetSnapshot(DbDataReader reader)
{
var type = Type.GetType(reader.GetString(3), true);
var serializer = Serialization.FindSerializerForType(type);
var serializer = Serialization.FindSerializerForType(type, Configuration.DefaultSerializer);
var binary = (byte[])reader[4];

var obj = serializer.FromBinary(binary, type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ protected override async Task DeleteAsync(string persistenceId, SnapshotSelectio
private SnapshotEntry ToSnapshotEntry(SnapshotMetadata metadata, object snapshot)
{
var snapshotType = snapshot.GetType();
var serializer = Context.System.Serialization.FindSerializerForType(snapshotType);
var serializer = Context.System.Serialization.FindSerializerForType(snapshotType, _settings.DefaultSerializer);

var binary = serializer.ToBinary(snapshot);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using System.Data.Common;
using Microsoft.Data.Sqlite;
using Akka.Configuration;
using Akka.Pattern;
using Akka.Persistence.Sql.Common.Journal;

namespace Akka.Persistence.Sqlite.Journal
Expand All @@ -22,9 +23,9 @@ namespace Akka.Persistence.Sqlite.Journal
public sealed class BatchingSqliteJournalSetup : BatchingSqlJournalSetup
{
/// <summary>
/// TBD
/// Initializes a new instance of the <see cref="BatchingSqliteJournalSetup" /> class.
/// </summary>
/// <param name="config">TBD</param>
/// <param name="config">Config object used to obtain Journal settings</param>
public BatchingSqliteJournalSetup(Config config) : base(config, new QueryConfiguration(
schemaName: null,
journalEventsTableName: config.GetString("table-name"),
Expand All @@ -37,26 +38,34 @@ public sealed class BatchingSqliteJournalSetup : BatchingSqlJournalSetup
isDeletedColumnName: "is_deleted",
tagsColumnName: "tags",
orderingColumnName: "ordering",
timeout: config.GetTimeSpan("connection-timeout")))
timeout: config.GetTimeSpan("connection-timeout"),
defaultSerializer: config.GetString("serializer")))
{
}

/// <summary>
/// TBD
/// Initializes a new instance of the <see cref="BatchingSqliteJournalSetup" /> class.
/// </summary>
/// <param name="connectionString">TBD</param>
/// <param name="maxConcurrentOperations">TBD</param>
/// <param name="maxBatchSize">TBD</param>
/// <param name="maxBufferSize">TBD</param>
/// <param name="autoInitialize">TBD</param>
/// <param name="connectionTimeout">TBD</param>
/// <param name="isolationLevel">TBD</param>
/// <param name="circuitBreakerSettings">TBD</param>
/// <param name="replayFilterSettings">TBD</param>
/// <param name="namingConventions">TBD</param>
/// <param name="connectionString">The connection string used to connect to the database.</param>
/// <param name="maxConcurrentOperations">The maximum number of batch operations allowed to be executed at the same time.</param>
/// <param name="maxBatchSize">The maximum size of single batch of operations to be executed over a single <see cref="DbConnection"/>.</param>
/// <param name="maxBufferSize">The maximum size of requests stored in journal buffer.</param>
/// <param name="autoInitialize">
/// If set to <c>true</c>, the journal executes all SQL scripts stored under the
/// <see cref="BatchingSqlJournal{TConnection,TCommand}.Initializers"/> collection prior
/// to starting executing any requests.
/// </param>
/// <param name="connectionTimeout">The maximum time given for executed <see cref="DbCommand"/> to complete.</param>
/// <param name="isolationLevel">The isolation level of transactions used during query execution.</param>
/// <param name="circuitBreakerSettings">
/// The settings used by the <see cref="CircuitBreaker"/> when for executing request batches.
/// </param>
/// <param name="replayFilterSettings">The settings used when replaying events from database back to the persistent actors.</param>
/// <param name="namingConventions">The naming conventions used by the database to construct valid SQL statements.</param>
/// <param name="defaultSerializer">The serializer used when no specific type matching can be found.</param>
public BatchingSqliteJournalSetup(string connectionString, int maxConcurrentOperations, int maxBatchSize, int maxBufferSize, bool autoInitialize,
TimeSpan connectionTimeout, IsolationLevel isolationLevel, CircuitBreakerSettings circuitBreakerSettings, ReplayFilterSettings replayFilterSettings, QueryConfiguration namingConventions)
: base(connectionString, maxConcurrentOperations, maxBatchSize, maxBufferSize, autoInitialize, connectionTimeout, isolationLevel, circuitBreakerSettings, replayFilterSettings, namingConventions)
TimeSpan connectionTimeout, IsolationLevel isolationLevel, CircuitBreakerSettings circuitBreakerSettings, ReplayFilterSettings replayFilterSettings, QueryConfiguration namingConventions, string defaultSerializer)
: base(connectionString, maxConcurrentOperations, maxBatchSize, maxBufferSize, autoInitialize, connectionTimeout, isolationLevel, circuitBreakerSettings, replayFilterSettings, namingConventions, defaultSerializer)
{
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ public SqliteJournal(Config journalConfig) : base(journalConfig.WithFallback(Ext
isDeletedColumnName: "is_deleted",
tagsColumnName: "tags",
orderingColumnName: "ordering",
timeout: config.GetTimeSpan("connection-timeout")),
timeout: config.GetTimeSpan("connection-timeout"),
defaultSerializer: config.GetString("serializer")),
Context.System.Serialization,
GetTimestampProvider(config.GetString("timestamp-provider")));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ public SqliteSnapshotStore(Config snapshotConfig) : base(snapshotConfig)
payloadColumnName: "payload",
manifestColumnName: "manifest",
timestampColumnName: "created_at",
timeout: config.GetTimeSpan("connection-timeout")),
timeout: config.GetTimeSpan("connection-timeout"),
defaultSerializer: config.GetString("serializer")),
Context.System.Serialization);
}

Expand Down
4 changes: 2 additions & 2 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4446,8 +4446,8 @@ namespace Akka.Serialization
public void AddSerializer(Akka.Serialization.Serializer serializer) { }
public object Deserialize(byte[] bytes, int serializerId, System.Type type) { }
public object Deserialize(byte[] bytes, int serializerId, string manifest) { }
public Akka.Serialization.Serializer FindSerializerFor(object obj) { }
public Akka.Serialization.Serializer FindSerializerForType(System.Type objectType) { }
public Akka.Serialization.Serializer FindSerializerFor(object obj, string defaultSerializerName = null) { }
public Akka.Serialization.Serializer FindSerializerForType(System.Type objectType, string defaultSerializerName = null) { }
public static string SerializedActorPath(Akka.Actor.IActorRef actorRef) { }
public static T SerializeWithTransport<T>(Akka.Actor.ActorSystem system, Akka.Actor.Address address, System.Func<T> action) { }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\contrib\serializers\Akka.Serialization.Hyperion\Akka.Serialization.Hyperion.csproj" />
<ProjectReference Include="..\Akka.Persistence\Akka.Persistence.csproj" />
<ProjectReference Include="..\Akka.Remote\Akka.Remote.csproj" />
<ProjectReference Include="..\Akka.Tests.Shared.Internals\Akka.Tests.Shared.Internals.csproj" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using Akka.Actor;
using Akka.Configuration;
using Akka.TestKit;
using FluentAssertions;
using Xunit;

namespace Akka.Persistence.Tests.Serialization
Expand All @@ -22,12 +23,12 @@ internal class Configs
akka.actor {
serializers {
my-payload = ""Akka.Persistence.Tests.Serialization.MyPayloadSerializer, Akka.Persistence.Tests""
my-payload2 = ""Akka.Persistence.Tests.Serialization.MyPayload2Serializer, Akka.Persistence.Tests""
old-payload = ""Akka.Persistence.Tests.Serialization.OldPayloadSerializer, Akka.Persistence.Tests""
testserializer = ""Akka.Serialization.HyperionSerializer, Akka.Serialization.Hyperion""
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to fix these tests. I switched system default serializer for these tests to Hyperion.
But im not sure if thats a good idea.

Copy link
Member Author

@Danthar Danthar Aug 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact. After talking this over with @alexvaluyskiy we are not sure why we have this Spec at all. It was disabled before. But it does not test any real-world scenario as far as we know.
What are the real-world scenario's for sending the Persistent or AtomicWrite types across the network.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None that I can think of

}
serialization-bindings {
""Akka.Persistence.Tests.Serialization.MyPayload, Akka.Persistence.Tests"" = my-payload
""Akka.Persistence.Tests.Serialization.MyPayload2, Akka.Persistence.Tests"" = my-payload2
""System.Object"" = testserializer
# this entry was used when creating the data for the test
# ""deserialize data when class is removed""
#""Akka.Persistence.Tests.Serialization.OldPayload, Akka.Persistence.Tests"" = old-payload
Expand Down Expand Up @@ -94,7 +95,7 @@ public override int GetHashCode()
}

// TODO: temporary disabled
public abstract class MessageSerializerRemotingSpec : AkkaSpec
public class MessageSerializerRemotingSpec : AkkaSpec
{
internal class LocalActor : ActorBase
{
Expand Down Expand Up @@ -186,7 +187,7 @@ public void MessageSerializer_should_custom_serialize_Persistent_messages_during
// this also verifies serialization of Persistent.Sender,
// because the RemoteActor will reply to the Persistent.Sender
_localActor.Tell(new Persistent(new MyPayload("a"), sender: TestActor));
ExpectMsg("p.a.");
ExpectMsg("pa");
}

[Fact]
Expand All @@ -195,8 +196,10 @@ public void MessageSerializer_should_custom_serialize_AtomicWrite_messages_durin
var p1 = new Persistent(new MyPayload("a"), sender: TestActor);
var p2 = new Persistent(new MyPayload("b"), sender: TestActor);
_localActor.Tell(new AtomicWrite(ImmutableList.Create(new IPersistentRepresentation[] {p1, p2})));
ExpectMsg("p.a.");
ExpectMsg("p.b.");
Within(5.Seconds(), () => {
ExpectMsg("pa");
ExpectMsg("pb");
});
}

[Fact]
Expand Down
Loading