diff --git a/Akka.Persistence.Linq2Db.sln b/Akka.Persistence.Linq2Db.sln index a8454370..c56e1177 100644 --- a/Akka.Persistence.Linq2Db.sln +++ b/Akka.Persistence.Linq2Db.sln @@ -43,6 +43,12 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "build-system", "build-syste build-system\windows-release.yaml = build-system\windows-release.yaml EndProjectSection EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Linq2Db.Sandbox", "src\Akka.Linq2Db.Sandbox\Akka.Linq2Db.Sandbox.csproj", "{C9D2CCA5-D431-44F7-B8FC-5424655291B6}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Persistence.Linq2Db.IndexHelperApp", "src\Akka.Persistence.Linq2Db.IndexHelperApp\Akka.Persistence.Linq2Db.IndexHelperApp.csproj", "{7B03FDD2-45CC-4F83-B23F-66B4EE82437A}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Persistence.Linq2Db.IndexHelperLib", "src\Akka.Persistence.Linq2Db.IndexHelperLib\Akka.Persistence.Linq2Db.IndexHelperLib.csproj", "{FD78AE77-C95F-4D5D-90EC-923935BAC580}" +EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Persistence.Linq2Db.Data.Compatibility.Tests", "src\Akka.Persistence.Linq2Db.Data.Compatibility.Tests\Akka.Persistence.Linq2Db.Data.Compatibility.Tests.csproj", "{5240D81C-0F6C-4F4A-85A1-6442C6D6ECBF}" EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Sql Scripts", "Sql Scripts", "{C4AC49FF-9ECF-4D38-A201-1105BBF7E628}" @@ -113,6 +119,18 @@ Global {170698FA-DA1E-40BC-896D-AFA67976C0EB}.Debug|Any CPU.Build.0 = Debug|Any CPU {170698FA-DA1E-40BC-896D-AFA67976C0EB}.Release|Any CPU.ActiveCfg = Release|Any CPU {170698FA-DA1E-40BC-896D-AFA67976C0EB}.Release|Any CPU.Build.0 = Release|Any CPU + {C9D2CCA5-D431-44F7-B8FC-5424655291B6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {C9D2CCA5-D431-44F7-B8FC-5424655291B6}.Debug|Any CPU.Build.0 = Debug|Any CPU + {C9D2CCA5-D431-44F7-B8FC-5424655291B6}.Release|Any CPU.ActiveCfg = Release|Any CPU + {C9D2CCA5-D431-44F7-B8FC-5424655291B6}.Release|Any CPU.Build.0 = Release|Any CPU + {7B03FDD2-45CC-4F83-B23F-66B4EE82437A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {7B03FDD2-45CC-4F83-B23F-66B4EE82437A}.Debug|Any CPU.Build.0 = Debug|Any CPU + {7B03FDD2-45CC-4F83-B23F-66B4EE82437A}.Release|Any CPU.ActiveCfg = Release|Any CPU + {7B03FDD2-45CC-4F83-B23F-66B4EE82437A}.Release|Any CPU.Build.0 = Release|Any CPU + {FD78AE77-C95F-4D5D-90EC-923935BAC580}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {FD78AE77-C95F-4D5D-90EC-923935BAC580}.Debug|Any CPU.Build.0 = Debug|Any CPU + {FD78AE77-C95F-4D5D-90EC-923935BAC580}.Release|Any CPU.ActiveCfg = Release|Any CPU + {FD78AE77-C95F-4D5D-90EC-923935BAC580}.Release|Any CPU.Build.0 = Release|Any CPU {5240D81C-0F6C-4F4A-85A1-6442C6D6ECBF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {5240D81C-0F6C-4F4A-85A1-6442C6D6ECBF}.Debug|Any CPU.Build.0 = Debug|Any CPU {5240D81C-0F6C-4F4A-85A1-6442C6D6ECBF}.Release|Any CPU.ActiveCfg = Release|Any CPU diff --git a/README.md b/README.md index dc2bb158..02aaa9e0 100644 --- a/README.md +++ b/README.md @@ -57,7 +57,26 @@ Working: - Classes used in place of ValueTuples in certain areas - We don't have separate Query classes at this time. This can definitely be improved in future - A couple of places around `WriteMessagesAsync` have had their logic moved to facilitate performance (i.e. use of `await` instead of `ContinueWith`) - - Backwards Compatibility mode is implemented, to interoperate with existing journals and snapsho stores. + - Backwards Compatibility mode is implemented, to interoperate with existing journals and snapshot stores. + +- Tag Table Support (Alpha): + - Allows the writing of tags to a separate table to allow for different performance strategies when working with tags. + - Supports Two Tag Table Modes: + - WriteUUID: The tag table and join uses a 'sequential-uuid' type field that will have lower page splits while allowing for good row locality on insert. + - This option is intended for those who want maximum write performance, at the expense of database storage and load. + - OrderingId: Uses the Journal Row's 'ordering' sequential Int64 for the tag table and join. + - This option is intended for those who want more efficient use of the DB's space + - This will result in slower writes, but faster/more efficient reads. + - Provides multiple modes of operation for reads and writes, note that there are separate switches for both read and write! + - CommaSeparatedOnly: The old behavior, where the comma separated tags are held in a column + - CommaSeparatedAndTagTable: Will Read/Write from both the Comma Separated column as well as the Tag Table + - TagTableOnly: will only use the tag table for Read/Write + - 'Live' Migration should be possible via the following flow: + 1. Run Migration scripts to create new columns/tables. + 2. Rolling Deploy your system with Reads and Writes in 'CommaSeparatedAndTagTable' mode. + 3. Rolling deploy your system (again), with Writes now in 'TagTableOnly' mode. + 4. Run Migration App/Script to move existing tags into tag table. + 5. Rolling deploy your system (last one!) with Reads now in 'TagTableOnly' mode. ## Currently Implemented: diff --git a/scripts/MySql/1_Migration_Setup.sql b/scripts/MySql/1_Migration_Setup.sql index df160900..51a17a0a 100644 --- a/scripts/MySql/1_Migration_Setup.sql +++ b/scripts/MySql/1_Migration_Setup.sql @@ -1,13 +1,14 @@ -CREATE TABLE IF NOT EXISTS TagTable( +CREATE TABLE IF NOT EXISTS tags( ordering_id BIGINT NOT NULL, tag NVARCHAR(64) NOT NULL, PRIMARY KEY (ordering_id, tag) ); DROP PROCEDURE IF EXISTS Split; -DELIMITER # + +DELIMITER ?? CREATE PROCEDURE Split() -proc_main: BEGIN +BEGIN DECLARE v_cursor_done TINYINT UNSIGNED DEFAULT 0; DECLARE Id INT UNSIGNED; @@ -38,7 +39,7 @@ proc_main: BEGIN END IF; IF LENGTH(slice) > 0 THEN - INSERT IGNORE INTO TagTable (ordering_id, tag) VALUES (Id, slice); + INSERT IGNORE INTO tags (ordering_id, tag) VALUES (Id, slice); END IF; SET String = RIGHT(String, LENGTH(String) - idx); @@ -51,5 +52,5 @@ proc_main: BEGIN CLOSE v_cursor; -END proc_main # +END?? DELIMITER ; \ No newline at end of file diff --git a/scripts/PostgreSql/1_Migration_Setup.sql b/scripts/PostgreSql/1_Migration_Setup.sql index d0579804..e05b3d45 100644 --- a/scripts/PostgreSql/1_Migration_Setup.sql +++ b/scripts/PostgreSql/1_Migration_Setup.sql @@ -1,4 +1,4 @@ -CREATE TABLE IF NOT EXISTS "public"."TagTable"( +CREATE TABLE IF NOT EXISTS "public"."tags"( ordering_id BIGINT NOT NULL, tag VARCHAR(64) NOT NULL, PRIMARY KEY (ordering_id, tag) @@ -10,7 +10,7 @@ BEGIN FOR var_t IN(SELECT unnest(string_to_array(tags, ';')) AS t) LOOP CONTINUE WHEN var_t.t IS NULL OR var_t.t = ''; - INSERT INTO "public"."TagTable" (ordering_id, tag) + INSERT INTO "public"."tags" (ordering_id, tag) VALUES (id, var_t.t) ON CONFLICT DO NOTHING; END LOOP; diff --git a/scripts/SqlServer/1_Migration_Setup.sql b/scripts/SqlServer/1_Migration_Setup.sql index 6c21f4a0..df6227e2 100644 --- a/scripts/SqlServer/1_Migration_Setup.sql +++ b/scripts/SqlServer/1_Migration_Setup.sql @@ -1,7 +1,7 @@ IF NOT EXISTS(SELECT * FROM INFORMATION_SCHEMA.TABLES - WHERE TABLE_SCHEMA = N'dbo' AND TABLE_NAME = N'TagTable') + WHERE TABLE_SCHEMA = N'dbo' AND TABLE_NAME = N'tags') BEGIN - CREATE TABLE [dbo].[TagTable]( + CREATE TABLE [dbo].[tags]( ordering_id BIGINT NOT NULL, tag NVARCHAR(64) NOT NULL, PRIMARY KEY (ordering_id, tag) diff --git a/scripts/SqlServer/2_Migration.sql b/scripts/SqlServer/2_Migration.sql index be316d4c..56f9fcd1 100644 --- a/scripts/SqlServer/2_Migration.sql +++ b/scripts/SqlServer/2_Migration.sql @@ -1,10 +1,10 @@ -INSERT INTO [dbo].[TagTable]([ordering_id], [tag]) +INSERT INTO [dbo].[tags]([ordering_id], [tag]) SELECT * FROM ( SELECT a.[Ordering], b.[items] FROM [dbo].[EventJournal] AS a CROSS APPLY [dbo].[Split](a.Tags, ';') b ) AS s([ordering_id], [tag]) WHERE NOT EXISTS ( - SELECT * FROM [dbo].[TagTable] t WITH (updlock) + SELECT * FROM [dbo].[tags] t WITH (updlock) WHERE s.[ordering_id] = t.[ordering_id] AND s.[tag] = t.[tag] ); \ No newline at end of file diff --git a/src/Akka.Linq2Db.Sandbox/Akka.Linq2Db.Sandbox.csproj b/src/Akka.Linq2Db.Sandbox/Akka.Linq2Db.Sandbox.csproj new file mode 100644 index 00000000..e03beac7 --- /dev/null +++ b/src/Akka.Linq2Db.Sandbox/Akka.Linq2Db.Sandbox.csproj @@ -0,0 +1,14 @@ + + + + netcoreapp3.1 + enable + + + + + + + + + diff --git a/src/Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests/SqlCommon/DockerBatchingSqlServerJournalPerfSpec.cs b/src/Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests/SqlCommon/DockerBatchingSqlServerJournalPerfSpec.cs index c463c695..6a704284 100644 --- a/src/Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests/SqlCommon/DockerBatchingSqlServerJournalPerfSpec.cs +++ b/src/Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests/SqlCommon/DockerBatchingSqlServerJournalPerfSpec.cs @@ -17,7 +17,7 @@ public DockerBatchingSqlServerJournalPerfSpec(ITestOutputHelper output, SqlServe public static Config InitConfig(SqlServerFixture fixture) { //need to make sure db is created before the tests start - DockerDbUtils.Initialize(fixture.ConnectionString); + SqlServerDbUtils.Initialize(fixture.ConnectionString); var specString = $@" akka.persistence {{ publish-plugin-commands = on @@ -29,7 +29,7 @@ class = ""Akka.Persistence.SqlServer.Journal.BatchingSqlServerJournal, Akka.Pers table-name = EventJournal schema-name = dbo auto-initialize = on - connection-string = ""{DockerDbUtils.ConnectionString}"" + connection-string = ""{SqlServerDbUtils.ConnectionString}"" }} }} }}"; diff --git a/src/Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests/SqlCommon/DockerSqlServerJournalPerfSpec.cs b/src/Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests/SqlCommon/DockerSqlServerJournalPerfSpec.cs index 1f2bff44..aa054f33 100644 --- a/src/Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests/SqlCommon/DockerSqlServerJournalPerfSpec.cs +++ b/src/Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests/SqlCommon/DockerSqlServerJournalPerfSpec.cs @@ -16,7 +16,7 @@ public DockerSqlServerJournalPerfSpec(ITestOutputHelper output, SqlServerFixture public static Config InitConfig(SqlServerFixture fixture) { //need to make sure db is created before the tests start - DockerDbUtils.Initialize(fixture.ConnectionString); + SqlServerDbUtils.Initialize(fixture.ConnectionString); var specString = $@" akka.persistence {{ publish-plugin-commands = on @@ -28,7 +28,7 @@ class = ""Akka.Persistence.SqlServer.Journal.SqlServerJournal, Akka.Persistence. table-name = EventJournal schema-name = dbo auto-initialize = on - connection-string = ""{DockerDbUtils.ConnectionString}"" + connection-string = ""{SqlServerDbUtils.ConnectionString}"" }} }} }}"; diff --git a/src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Docker/Linq2Db/DockerLinq2DbPostgreSQLJournalPerfSpec.cs b/src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Docker/Linq2Db/DockerLinq2DbPostgreSQLJournalPerfSpec.cs index 7d6bfec7..4d4471e3 100644 --- a/src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Docker/Linq2Db/DockerLinq2DbPostgreSQLJournalPerfSpec.cs +++ b/src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Docker/Linq2Db/DockerLinq2DbPostgreSQLJournalPerfSpec.cs @@ -46,9 +46,8 @@ public DockerLinq2DbPostgreSqlJournalPerfSpec(ITestOutputHelper output, PostgreSqlFixture fixture) : base(InitConfig(fixture), "postgresperf", output,40, eventsCount: TestConstants.DockerNumMessages) { - var extension = Linq2DbPersistence.Get(Sys); - var config = Create(DockerDbUtils.ConnectionString) - .WithFallback(extension.DefaultConfig) + var config = Create(SqlServerDbUtils.ConnectionString) + .WithFallback(Linq2DbPersistence.DefaultConfiguration) .GetConfig("akka.persistence.journal.linq2db"); var connFactory = new AkkaPersistenceDataConnectionFactory(new JournalConfig(config)); using var conn = connFactory.GetConnection(); diff --git a/src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Docker/Linq2Db/DockerLinq2DbSqlServerJournalPerfSpec.cs b/src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Docker/Linq2Db/DockerLinq2DbSqlServerJournalPerfSpec.cs index 2576017a..882c0fea 100644 --- a/src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Docker/Linq2Db/DockerLinq2DbSqlServerJournalPerfSpec.cs +++ b/src/Akka.Persistence.Linq2Db.Benchmark.DockerTests/Docker/Linq2Db/DockerLinq2DbSqlServerJournalPerfSpec.cs @@ -49,9 +49,8 @@ public DockerLinq2DbSqlServerJournalPerfSpec(ITestOutputHelper output, SqlServerFixture fixture) : base(InitConfig(fixture), "sqlserverperf", output,40, eventsCount: TestConstants.DockerNumMessages) { - var extension = Linq2DbPersistence.Get(Sys); - var config = Create(DockerDbUtils.ConnectionString) - .WithFallback(extension.DefaultConfig) + var config = Create(SqlServerDbUtils.ConnectionString) + .WithFallback(Linq2DbPersistence.DefaultConfiguration) .GetConfig("akka.persistence.journal.linq2db"); var connFactory = new AkkaPersistenceDataConnectionFactory(new JournalConfig(config)); using var conn = connFactory.GetConnection(); @@ -68,14 +67,14 @@ public DockerLinq2DbSqlServerJournalPerfSpec(ITestOutputHelper output, public static Config InitConfig(SqlServerFixture fixture) { //need to make sure db is created before the tests start - DbUtils.Initialize(fixture.ConnectionString); + SqlServerDbUtils.Initialize(fixture.ConnectionString); - return Create(DbUtils.ConnectionString); + return Create(SqlServerDbUtils.ConnectionString); } protected override void Dispose(bool disposing) { base.Dispose(disposing); - DbUtils.Clean(); + SqlServerDbUtils.Clean(); } [Fact] diff --git a/src/Akka.Persistence.Linq2Db.Benchmark.Tests/Linq2Db/MSSQLiteLinq2DbJournalPerfSpec.cs b/src/Akka.Persistence.Linq2Db.Benchmark.Tests/Linq2Db/MSSQLiteLinq2DbJournalPerfSpec.cs index 3acce59c..752b01c0 100644 --- a/src/Akka.Persistence.Linq2Db.Benchmark.Tests/Linq2Db/MSSQLiteLinq2DbJournalPerfSpec.cs +++ b/src/Akka.Persistence.Linq2Db.Benchmark.Tests/Linq2Db/MSSQLiteLinq2DbJournalPerfSpec.cs @@ -34,13 +34,11 @@ public static void InitWalForFileDb() public MsSqliteLinq2DbJournalPerfSpec(ITestOutputHelper output) : base(SqLiteJournalSpecConfig.Create(ConnString, ProviderName.SQLiteMS), "SqliteJournalSpec", output,eventsCount: TestConstants.NumMessages) { - var extension = Linq2DbPersistence.Get(Sys); - HeldSqliteConnection.Open(); //InitWALForFileDb(); var conf = new JournalConfig( SqLiteJournalSpecConfig.Create(ConnString, ProviderName.SQLiteMS) - .WithFallback(extension.DefaultConfig) + .WithFallback(Linq2DbPersistence.DefaultConfiguration) .GetConfig("akka.persistence.journal.linq2db")); var connFactory = new AkkaPersistenceDataConnectionFactory(conf); diff --git a/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/Akka.Persistence.Linq2Db.Compatibility.DockerTests.csproj b/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/Akka.Persistence.Linq2Db.Compatibility.DockerTests.csproj index c3e21ca1..1d0ec8ff 100644 --- a/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/Akka.Persistence.Linq2Db.Compatibility.DockerTests.csproj +++ b/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/Akka.Persistence.Linq2Db.Compatibility.DockerTests.csproj @@ -2,13 +2,9 @@ - false - Akka.Persistence.Linq2Db.CompatibilityTests.Docker - netcoreapp3.1 - @@ -32,16 +28,4 @@ - - - - - - - - - - - - diff --git a/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/Postgres/DockerLinq2DbPostgreSqlSqlCommonJournalCompatibilitySpec.cs b/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/Postgres/DockerLinq2DbPostgreSqlSqlCommonJournalCompatibilitySpec.cs deleted file mode 100644 index c028fe6a..00000000 --- a/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/Postgres/DockerLinq2DbPostgreSqlSqlCommonJournalCompatibilitySpec.cs +++ /dev/null @@ -1,105 +0,0 @@ -using System; -using Akka.Configuration; -using Akka.Persistence.Sql.Linq2Db.Config; -using Akka.Persistence.Sql.Linq2Db.Db; -using Akka.Persistence.Sql.Linq2Db.Journal; -using Akka.Persistence.Sql.Linq2Db.Journal.Types; -using Akka.Persistence.Sql.Linq2Db.Tests.Docker; -using Akka.Persistence.Sql.Linq2Db.Tests.Docker.Docker; -using LinqToDB; -using Xunit; -using Xunit.Abstractions; - -namespace Akka.Persistence.Linq2Db.CompatibilityTests.Docker.Postgres -{ - [Collection("PostgreSQLSpec")] - public class DockerLinq2DbPostgreSqlSqlCommonJournalCompatibilitySpec : SqlCommonJournalCompatibilitySpec - { - public static string _journalBaseConfig = @" - akka.persistence {{ - publish-plugin-commands = on - journal {{ - plugin = ""akka.persistence.journal.testspec"" - testspec {{ - class = ""{0}"" - #plugin-dispatcher = ""akka.actor.default-dispatcher"" - plugin-dispatcher = ""akka.persistence.dispatchers.default-plugin-dispatcher"" - - connection-string = ""{1}"" -#connection-string = ""FullUri=file:test.db&cache=shared"" - provider-name = """ + LinqToDB.ProviderName.PostgreSQL95 + @""" - use-clone-connection = true - table-compatibility-mode = ""postgres"" - tables.journal {{ - auto-init = true - table-name = ""{2}"" - schema-name = ""public"" - metadata-table-name = ""{3}"" - }} - }} - postgresql {{ - class = ""Akka.Persistence.PostgreSql.Journal.PostgreSqlJournal, Akka.Persistence.PostgreSql"" - #plugin-dispatcher = ""akka.actor.default-dispatcher"" - plugin-dispatcher = ""akka.persistence.dispatchers.default-plugin-dispatcher"" - table-name = ""{2}"" - metadata-table-name = ""{3}"" - schema-name = public - auto-initialize = on - connection-string = ""{1}"" - }} - }} - }} - "; - - public static Configuration.Config Create(string connString) - { - return ConfigurationFactory.ParseString( - string.Format(_journalBaseConfig, - typeof(Linq2DbWriteJournal).AssemblyQualifiedName, - connString,"event_journal","metadata")); - } - - protected override Configuration.Config Config { get; } - - protected override string OldJournal => - "akka.persistence.journal.postgresql"; - - protected override string NewJournal => - "akka.persistence.journal.testspec"; - - - public DockerLinq2DbPostgreSqlSqlCommonJournalCompatibilitySpec(ITestOutputHelper output, - PostgreSQLFixture fixture) : base( output) - { - //DebuggingHelpers.SetupTraceDump(output); - Config = InitConfig(fixture); - var connFactory = new AkkaPersistenceDataConnectionFactory(new JournalConfig(Create(DockerDbUtils.ConnectionString).GetConfig("akka.persistence.journal.testspec"))); - using (var conn = connFactory.GetConnection()) - { - try - { - conn.GetTable().Delete(); - } - catch (Exception e) - { - } - - } - } - - public static Configuration.Config InitConfig(PostgreSQLFixture fixture) - { - //need to make sure db is created before the tests start - //DbUtils.Initialize(fixture.ConnectionString); - - - return Create(fixture.ConnectionString); - } - protected void Dispose(bool disposing) - { - //base.Dispose(disposing); -// DbUtils.Clean(); - } - - } -} \ No newline at end of file diff --git a/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/Postgres/DockerLinq2DbPostgreSqlSqlCommonSnapshotCompatibilitySpec.cs b/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/Postgres/DockerLinq2DbPostgreSqlSqlCommonSnapshotCompatibilitySpec.cs deleted file mode 100644 index ab1cda4e..00000000 --- a/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/Postgres/DockerLinq2DbPostgreSqlSqlCommonSnapshotCompatibilitySpec.cs +++ /dev/null @@ -1,112 +0,0 @@ -// //----------------------------------------------------------------------- -// // -// // Copyright (C) 2009-2020 Lightbend Inc. -// // Copyright (C) 2013-2020 .NET Foundation -// // -// //----------------------------------------------------------------------- - -using System; -using Akka.Configuration; -using Akka.Persistence.PostgreSql.Snapshot; -using Akka.Persistence.Sql.Linq2Db.Config; -using Akka.Persistence.Sql.Linq2Db.Db; -using Akka.Persistence.Sql.Linq2Db.Snapshot; -using Akka.Persistence.Sql.Linq2Db.Tests.Docker; -using Akka.Persistence.Sql.Linq2Db.Tests.Docker.Docker; -using LinqToDB; -using Xunit; -using Xunit.Abstractions; - -namespace Akka.Persistence.Linq2Db.CompatibilityTests.Docker.Postgres -{ - [Collection("PostgreSQLSpec")] - public class DockerLinq2DbPostgreSqlSqlCommonSnapshotCompatibilitySpec : SqlCommonSnapshotCompatibilitySpec - { - public static string _snapshotBaseConfig = @" - akka.persistence {{ - publish-plugin-commands = on - snapshot-store {{ - plugin = ""akka.persistence.snapshot-store.testspec"" - testspec {{ - class = ""{0}"" - #plugin-dispatcher = ""akka.actor.default-dispatcher"" - plugin-dispatcher = ""akka.persistence.dispatchers.default-plugin-dispatcher"" - - connection-string = ""{1}"" -#connection-string = ""FullUri=file:test.db&cache=shared"" - provider-name = """ + LinqToDB.ProviderName.PostgreSQL95 + @""" - use-clone-connection = true - table-compatibility-mode = ""postgres"" - tables.snapshot {{ - auto-init = true - table-name = ""{2}"" - schema-name = ""public"" - metadata-table-name = ""{3}"" - }} - }} - postgresql {{ - class = """+typeof(PostgreSqlSnapshotStore).AssemblyQualifiedName +@""" - #plugin-dispatcher = ""akka.actor.default-dispatcher"" - plugin-dispatcher = ""akka.persistence.dispatchers.default-plugin-dispatcher"" - table-name = ""{2}"" - metadata-table-name = ""{3}"" - schema-name = public - auto-initialize = on - connection-string = ""{1}"" - }} - }} - }} - "; - - public static Configuration.Config Create(string connString) - { - return ConfigurationFactory.ParseString( - string.Format(_snapshotBaseConfig, - typeof(Linq2DbSnapshotStore).AssemblyQualifiedName, - connString,"snapshot_compat","metadata")); - } - - protected override Configuration.Config Config { get; } - - protected override string OldSnapshot => - "akka.persistence.snapshot-store.postgresql"; - - protected override string NewSnapshot => - "akka.persistence.snapshot-store.testspec"; - - - public DockerLinq2DbPostgreSqlSqlCommonSnapshotCompatibilitySpec(ITestOutputHelper output, - PostgreSQLFixture fixture) : base( output) - { - //DebuggingHelpers.SetupTraceDump(output); - Config = InitConfig(fixture); - var connFactory = new AkkaPersistenceDataConnectionFactory(new SnapshotConfig(Create(DockerDbUtils.ConnectionString).GetConfig("akka.persistence.snapshot-store.testspec"))); - using (var conn = connFactory.GetConnection()) - { - try - { - conn.GetTable().Delete(); - } - catch (Exception e) - { - } - - } - } - - public static Configuration.Config InitConfig(PostgreSQLFixture fixture) - { - //need to make sure db is created before the tests start - //DbUtils.Initialize(fixture.ConnectionString); - - - return Create(fixture.ConnectionString); - } - protected void Dispose(bool disposing) - { - //base.Dispose(disposing); -// DbUtils.Clean(); - } - - } -} \ No newline at end of file diff --git a/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/Postgres/PostgreSQLSpecsFixture.cs b/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/Postgres/PostgreSQLSpecsFixture.cs deleted file mode 100644 index 480958ba..00000000 --- a/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/Postgres/PostgreSQLSpecsFixture.cs +++ /dev/null @@ -1,11 +0,0 @@ -using Akka.Persistence.Sql.Linq2Db.Tests.Docker; -using Akka.Persistence.Sql.Linq2Db.Tests.Docker.Docker; -using Xunit; - -namespace Akka.Persistence.Linq2Db.CompatibilityTests.Docker.Postgres -{ - [CollectionDefinition("PostgreSQLSpec")] - public sealed class PostgreSQLSpecsFixture : ICollectionFixture - { - } -} \ No newline at end of file diff --git a/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/Postgres/PostgreSqlCommonJournalCompatibilitySpec.cs b/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/Postgres/PostgreSqlCommonJournalCompatibilitySpec.cs new file mode 100644 index 00000000..872a32dc --- /dev/null +++ b/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/Postgres/PostgreSqlCommonJournalCompatibilitySpec.cs @@ -0,0 +1,27 @@ +using System.Threading.Tasks; +using Akka.Configuration; +using Akka.Persistence.Sql.Linq2Db.Tests.Docker.Docker; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Persistence.Linq2Db.CompatibilityTests.Docker.Postgres +{ + [Collection("PostgreSqlSpec")] + public class PostgreSqlCommonJournalCompatibilitySpec : SqlCommonJournalCompatibilitySpec + { + protected override Config Config { get; } + + protected override string OldJournal => + "akka.persistence.journal.postgresql"; + + protected override string NewJournal => + "akka.persistence.journal.linq2db"; + + public PostgreSqlCommonJournalCompatibilitySpec(ITestOutputHelper output, PostgreSqlFixture fixture) + : base( output) + { + PostgreDbUtils.Initialize(fixture); + Config = PostgreSqlCompatibilitySpecConfig.InitJournalConfig("event_journal", "metadata"); + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/Postgres/PostgreSqlCommonSnapshotCompatibilitySpec.cs b/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/Postgres/PostgreSqlCommonSnapshotCompatibilitySpec.cs new file mode 100644 index 00000000..d57ad440 --- /dev/null +++ b/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/Postgres/PostgreSqlCommonSnapshotCompatibilitySpec.cs @@ -0,0 +1,38 @@ +// //----------------------------------------------------------------------- +// // +// // Copyright (C) 2009-2020 Lightbend Inc. +// // Copyright (C) 2013-2020 .NET Foundation +// // +// //----------------------------------------------------------------------- + +using System; +using Akka.Configuration; +using Akka.Persistence.PostgreSql.Snapshot; +using Akka.Persistence.Sql.Linq2Db.Config; +using Akka.Persistence.Sql.Linq2Db.Db; +using Akka.Persistence.Sql.Linq2Db.Snapshot; +using Akka.Persistence.Sql.Linq2Db.Tests.Docker.Docker; +using LinqToDB; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Persistence.Linq2Db.CompatibilityTests.Docker.Postgres +{ + [Collection("PostgreSqlSpec")] + public class PostgreSqlCommonSnapshotCompatibilitySpec : SqlCommonSnapshotCompatibilitySpec + { + protected override Config Config => PostgreSqlCompatibilitySpecConfig.InitSnapshotConfig("snapshot_store"); + + protected override string OldSnapshot => + "akka.persistence.snapshot-store.postgresql"; + + protected override string NewSnapshot => + "akka.persistence.snapshot-store.linq2db"; + + public PostgreSqlCommonSnapshotCompatibilitySpec(ITestOutputHelper output, PostgreSqlFixture fixture) + : base( output) + { + PostgreDbUtils.Initialize(fixture); + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/Postgres/PostgreSqlCompatibilitySpecConfig.cs b/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/Postgres/PostgreSqlCompatibilitySpecConfig.cs new file mode 100644 index 00000000..2d863ed6 --- /dev/null +++ b/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/Postgres/PostgreSqlCompatibilitySpecConfig.cs @@ -0,0 +1,90 @@ +using Akka.Configuration; +using Akka.Persistence.PostgreSql; +using Akka.Persistence.PostgreSql.Journal; +using Akka.Persistence.PostgreSql.Snapshot; +using Akka.Persistence.Sql.Linq2Db; +using Akka.Persistence.Sql.Linq2Db.Journal; +using Akka.Persistence.Sql.Linq2Db.Snapshot; +using Akka.Persistence.Sql.Linq2Db.Tests.Docker.Docker; + +namespace Akka.Persistence.Linq2Db.CompatibilityTests.Docker.Postgres +{ + public class PostgreSqlCompatibilitySpecConfig + { + public static Config InitSnapshotConfig(string tableName) + { + var specString = $@" +akka.persistence {{ + publish-plugin-commands = on + snapshot-store {{ + postgresql {{ + class = ""{typeof(PostgreSqlSnapshotStore).AssemblyQualifiedName}"" + plugin-dispatcher = ""akka.actor.default-dispatcher"" + connection-string = ""{PostgreDbUtils.ConnectionString}"" + connection-timeout = 30s + schema-name = public + table-name = {tableName} + auto-initialize = on + sequential-access = off + }} + + linq2db {{ + class = ""{typeof(Linq2DbSnapshotStore).AssemblyQualifiedName}"" + plugin-dispatcher = ""akka.actor.default-dispatcher"" + connection-string = ""{PostgreDbUtils.ConnectionString}"" + provider-name = {LinqToDB.ProviderName.PostgreSQL95} + table-mapping = postgresql + auto-initialize = true + postgresql {{ + snapshot {{ + table-name = ""{tableName}"" + }} + }} + }} + }} +}}"; + + return ConfigurationFactory.ParseString(specString); + } + + public static Config InitJournalConfig(string tableName, string metadataTableName) + { + var specString = $@" +akka.persistence {{ + publish-plugin-commands = on + journal {{ + postgresql {{ + class = ""Akka.Persistence.PostgreSql.Journal.PostgreSqlJournal, Akka.Persistence.PostgreSql"" + plugin-dispatcher = ""akka.actor.default-dispatcher"" + connection-string = ""{PostgreDbUtils.ConnectionString}"" + connection-timeout = 30s + schema-name = public + table-name = ""{tableName}"" + metadata-table-name = ""{metadataTableName}"" + auto-initialize = on + }} + + linq2db {{ + class = ""{typeof(Linq2DbWriteJournal).AssemblyQualifiedName}"" + plugin-dispatcher = ""akka.persistence.dispatchers.default-plugin-dispatcher"" + connection-string = ""{PostgreDbUtils.ConnectionString}"" + provider-name = ""{LinqToDB.ProviderName.PostgreSQL95}"" + parallelism = 3 + table-mapping = postgresql + auto-initialize = true + postgresql {{ + journal {{ + table-name = ""{tableName}"" + }} + metadata {{ + table-name = ""{metadataTableName}"" + }} + }} + }} + }} +}}"; + + return ConfigurationFactory.ParseString(specString); + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/Postgres/PostgreSqlSpecsFixture.cs b/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/Postgres/PostgreSqlSpecsFixture.cs new file mode 100644 index 00000000..faab8dbd --- /dev/null +++ b/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/Postgres/PostgreSqlSpecsFixture.cs @@ -0,0 +1,16 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2013-2022 .NET Foundation +// +// ----------------------------------------------------------------------- + +using Akka.Persistence.Sql.Linq2Db.Tests.Docker.Docker; +using Xunit; + +namespace Akka.Persistence.Linq2Db.CompatibilityTests.Docker.Postgres +{ + [CollectionDefinition("PostgreSqlSpec")] + public sealed class PostgreSqlSpecsFixture : ICollectionFixture + { + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/SqlServer/SqlServerCompatibilitySpecConfig.cs b/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/SqlServer/SqlServerCompatibilitySpecConfig.cs index 2660e91f..d224fa05 100644 --- a/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/SqlServer/SqlServerCompatibilitySpecConfig.cs +++ b/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/SqlServer/SqlServerCompatibilitySpecConfig.cs @@ -11,7 +11,6 @@ public class SqlServerCompatibilitySpecConfig { public static Config InitSnapshotConfig(string tableName) { - DbUtils.ConnectionString = DockerDbUtils.ConnectionString; var specString = $@" akka.persistence {{ publish-plugin-commands = on @@ -19,7 +18,7 @@ public static Config InitSnapshotConfig(string tableName) sql-server {{ class = ""Akka.Persistence.SqlServer.Snapshot.SqlServerSnapshotStore, Akka.Persistence.SqlServer"" plugin-dispatcher = ""akka.actor.default-dispatcher"" - connection-string = ""{DbUtils.ConnectionString}"" + connection-string = ""{SqlServerDbUtils.ConnectionString}"" connection-timeout = 30s schema-name = dbo table-name = ""{tableName}"" @@ -31,7 +30,7 @@ class = ""Akka.Persistence.SqlServer.Snapshot.SqlServerSnapshotStore, Akka.Persi linq2db {{ class = ""{typeof(Linq2DbSnapshotStore).AssemblyQualifiedName}"" plugin-dispatcher = ""akka.persistence.dispatchers.default-plugin-dispatcher"" - connection-string = ""{DbUtils.ConnectionString}"" + connection-string = ""{SqlServerDbUtils.ConnectionString}"" provider-name = ""{LinqToDB.ProviderName.SqlServer2017}"" table-mapping = sql-server auto-initialize = true @@ -49,13 +48,10 @@ class = ""{typeof(Linq2DbSnapshotStore).AssemblyQualifiedName}"" public static Config InitJournalConfig(string tableName, string metadataTableName) { - DbUtils.ConnectionString = DockerDbUtils.ConnectionString; var specString = $@" akka.persistence {{ publish-plugin-commands = on journal {{ - plugin = ""akka.persistence.journal.sql-server"" - sql-server {{ class = ""Akka.Persistence.SqlServer.Journal.SqlServerJournal, Akka.Persistence.SqlServer"" plugin-dispatcher = ""akka.persistence.dispatchers.default-plugin-dispatcher"" @@ -63,13 +59,13 @@ class = ""Akka.Persistence.SqlServer.Journal.SqlServerJournal, Akka.Persistence. metadata-table-name = ""{metadataTableName}"" schema-name = dbo auto-initialize = on - connection-string = ""{DbUtils.ConnectionString}"" + connection-string = ""{SqlServerDbUtils.ConnectionString}"" }} linq2db {{ class = ""{typeof(Linq2DbWriteJournal).AssemblyQualifiedName}"" plugin-dispatcher = ""akka.persistence.dispatchers.default-plugin-dispatcher"" - connection-string = ""{DbUtils.ConnectionString}"" + connection-string = ""{SqlServerDbUtils.ConnectionString}"" provider-name = ""{LinqToDB.ProviderName.SqlServer2017}"" parallelism = 3 table-mapping = sql-server diff --git a/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/SqlServer/SqlServerSpecsFixture.cs b/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/SqlServer/SqlServerSpecsFixture.cs index 18d31a3a..ae7f1165 100644 --- a/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/SqlServer/SqlServerSpecsFixture.cs +++ b/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/SqlServer/SqlServerSpecsFixture.cs @@ -1,11 +1,9 @@ -// //----------------------------------------------------------------------- -// // -// // Copyright (C) 2009-2020 Lightbend Inc. -// // Copyright (C) 2013-2020 .NET Foundation -// // -// //----------------------------------------------------------------------- +// ----------------------------------------------------------------------- +// +// Copyright (C) 2013-2022 .NET Foundation +// +// ----------------------------------------------------------------------- -using Akka.Persistence.Sql.Linq2Db.Tests.Docker; using Akka.Persistence.Sql.Linq2Db.Tests.Docker.Docker; using Xunit; diff --git a/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/SqlServer/SqlServerSqlCommonJournalCompatibilitySpec.cs b/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/SqlServer/SqlServerSqlCommonJournalCompatibilitySpec.cs index 53b7b7c5..6e2fec77 100644 --- a/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/SqlServer/SqlServerSqlCommonJournalCompatibilitySpec.cs +++ b/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/SqlServer/SqlServerSqlCommonJournalCompatibilitySpec.cs @@ -11,7 +11,7 @@ public class SqlServerSqlCommonJournalCompatibilitySpec : SqlCommonJournalCompat public SqlServerSqlCommonJournalCompatibilitySpec(ITestOutputHelper outputHelper, SqlServerFixture fixture) : base(outputHelper) { - DockerDbUtils.Initialize(fixture.ConnectionString); + SqlServerDbUtils.Initialize(fixture.ConnectionString); } protected override string OldJournal => diff --git a/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/SqlServer/SqlServerSqlCommonSnapshotCompatibilitySpec.cs b/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/SqlServer/SqlServerSqlCommonSnapshotCompatibilitySpec.cs index 81fe1c87..1afcaf2a 100644 --- a/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/SqlServer/SqlServerSqlCommonSnapshotCompatibilitySpec.cs +++ b/src/Akka.Persistence.Linq2Db.Compatibility.DockerTests/SqlServer/SqlServerSqlCommonSnapshotCompatibilitySpec.cs @@ -1,5 +1,4 @@ -using Akka.Persistence.Sql.Linq2Db.Tests.Docker; -using Akka.Persistence.Sql.Linq2Db.Tests.Docker.Docker; +using Akka.Persistence.Sql.Linq2Db.Tests.Docker.Docker; using Xunit; using Xunit.Abstractions; @@ -11,7 +10,7 @@ public class SqlServerSqlCommonSnapshotCompatibilitySpec : SqlCommonSnapshotComp public SqlServerSqlCommonSnapshotCompatibilitySpec(ITestOutputHelper outputHelper, SqlServerFixture fixture) : base(outputHelper) { - DockerDbUtils.Initialize(fixture.ConnectionString); + SqlServerDbUtils.Initialize(fixture.ConnectionString); } protected override string OldSnapshot => diff --git a/src/Akka.Persistence.Linq2Db.Compatibility.Tests/Akka.Persistence.Linq2Db.Compatibility.Tests.csproj b/src/Akka.Persistence.Linq2Db.Compatibility.Tests/Akka.Persistence.Linq2Db.Compatibility.Tests.csproj index c5a06536..f212deb6 100644 --- a/src/Akka.Persistence.Linq2Db.Compatibility.Tests/Akka.Persistence.Linq2Db.Compatibility.Tests.csproj +++ b/src/Akka.Persistence.Linq2Db.Compatibility.Tests/Akka.Persistence.Linq2Db.Compatibility.Tests.csproj @@ -9,6 +9,7 @@ + diff --git a/src/Akka.Persistence.Linq2Db.Compatibility.Tests/JournalCompatActor.cs b/src/Akka.Persistence.Linq2Db.Compatibility.Tests/JournalCompatActor.cs index 43e8ecae..e18058fa 100644 --- a/src/Akka.Persistence.Linq2Db.Compatibility.Tests/JournalCompatActor.cs +++ b/src/Akka.Persistence.Linq2Db.Compatibility.Tests/JournalCompatActor.cs @@ -20,8 +20,8 @@ public DeleteUpToSequenceNumber(long nr) } public class JournalCompatActor : ReceivePersistentActor { - private List events = new List(); - private IActorRef deleteSubscriber; + private readonly List _events = new List(); + private IActorRef _deleteSubscriber; public JournalCompatActor(string journal, string persistenceId) { JournalPluginId = journal; @@ -31,22 +31,22 @@ public JournalCompatActor(string journal, string persistenceId) var sender = Sender; Persist(se, p => { - events.Add(p); + _events.Add(p); sender.Tell(se); }); }); - Command(ce=>Context.Sender.Tell(events.Any(e=>e.Guid==ce.Guid))); + Command(ce=>Context.Sender.Tell(_events.Any(e=>e.Guid==ce.Guid))); Command(gsn => Context.Sender.Tell( - new CurrentSequenceNr(this.LastSequenceNr))); + new CurrentSequenceNr(LastSequenceNr))); Command(dc => { - deleteSubscriber = Context.Sender; + _deleteSubscriber = Context.Sender; DeleteMessages(dc.Number); }); - Command(dms => deleteSubscriber?.Tell(dms)); - Command(dmf=>deleteSubscriber?.Tell(dmf)); - Recover(se => events.Add(se)); + Command(dms => _deleteSubscriber?.Tell(dms)); + Command(dmf=>_deleteSubscriber?.Tell(dmf)); + Recover(se => _events.Add(se)); } public override string PersistenceId { get; } } diff --git a/src/Akka.Persistence.Linq2Db.Compatibility.Tests/SqlCommonJournalCompatibilitySpec.cs b/src/Akka.Persistence.Linq2Db.Compatibility.Tests/SqlCommonJournalCompatibilitySpec.cs index 5b802533..6a4f550f 100644 --- a/src/Akka.Persistence.Linq2Db.Compatibility.Tests/SqlCommonJournalCompatibilitySpec.cs +++ b/src/Akka.Persistence.Linq2Db.Compatibility.Tests/SqlCommonJournalCompatibilitySpec.cs @@ -1,6 +1,7 @@ using System; using System.Threading.Tasks; using Akka.Actor; +using Akka.Event; using Akka.TestKit; using FluentAssertions; using LanguageExt.UnitsOfMeasure; @@ -50,11 +51,8 @@ public void Can_Recover_SqlCommon_Journal() Probe.ExpectMsg(someEvent, 5.Seconds()); Probe.Send(persistRef, new ContainsEvent { Guid = ourGuid }); Probe.ExpectMsg(true, 5.Seconds()); - - Probe.Watch(persistRef); - persistRef.Tell(PoisonPill.Instance); - Probe.ExpectTerminated(persistRef); - Probe.Unwatch(persistRef); + + EnsureTerminated(persistRef); persistRef = Sys.ActorOf(Props.Create(() => new JournalCompatActor(NewJournal, "p-1"))); Probe.Send(persistRef, new ContainsEvent { Guid = ourGuid }); @@ -73,10 +71,7 @@ public void Can_Persist_SqlCommon_Journal() Probe.Send(persistRef, new ContainsEvent { Guid = ourGuid }); Probe.ExpectMsg(true, 5.Seconds()); - Probe.Watch(persistRef); - persistRef.Tell(PoisonPill.Instance); - Probe.ExpectTerminated(persistRef); - Probe.Unwatch(persistRef); + EnsureTerminated(persistRef); persistRef = Sys.ActorOf(Props.Create(() => new JournalCompatActor(NewJournal, "p-2"))); Probe.Send(persistRef, new ContainsEvent { Guid = ourGuid }); @@ -103,10 +98,7 @@ public void SqlCommon_Journal_Can_Recover_L2Db_Journal() Probe.Send(persistRef, new ContainsEvent { Guid = ourGuid }); Probe.ExpectMsg(true, 5.Seconds()); - Probe.Watch(persistRef); - persistRef.Tell(PoisonPill.Instance); - Probe.ExpectTerminated(persistRef); - Probe.Unwatch(persistRef); + EnsureTerminated(persistRef); persistRef = Sys.ActorOf(Props.Create(() => new JournalCompatActor(OldJournal, "p-3"))); Probe.Send(persistRef, new ContainsEvent { Guid = ourGuid }); @@ -125,10 +117,7 @@ public void SqlCommon_Journal_Can_Persist_L2db_Journal() Probe.Send(persistRef, new ContainsEvent { Guid = ourGuid }); Probe.ExpectMsg(true, 5.Seconds()); - Probe.Watch(persistRef); - persistRef.Tell(PoisonPill.Instance); - Probe.ExpectTerminated(persistRef); - Probe.Unwatch(persistRef); + EnsureTerminated(persistRef); persistRef = Sys.ActorOf(Props.Create(() => new JournalCompatActor(OldJournal, "p-4"))); Probe.Send(persistRef, new ContainsEvent { Guid = ourGuid }); @@ -180,10 +169,7 @@ public void L2db_Journal_Delete_Compat_mode_Preserves_proper_SequenceNr() var delResult = Probe.ExpectMsg(5.Seconds()); delResult.Should().BeOfType(); - Probe.Watch(persistRef); - persistRef.Tell(PoisonPill.Instance); - Probe.ExpectTerminated(persistRef); - Probe.Unwatch(persistRef); + EnsureTerminated(persistRef); persistRef = Sys.ActorOf(Props.Create(() => new JournalCompatActor(NewJournal, persistenceId))); @@ -192,10 +178,7 @@ public void L2db_Journal_Delete_Compat_mode_Preserves_proper_SequenceNr() Output.WriteLine($"oldSeq : {currentSequenceNr.SequenceNumber} - newSeq : {reincaranatedSequenceNrNewJournal.SequenceNumber}"); reincaranatedSequenceNrNewJournal.SequenceNumber.Should().Be(currentSequenceNr.SequenceNumber); - Probe.Watch(persistRef); - persistRef.Tell(PoisonPill.Instance); - Probe.ExpectTerminated(persistRef); - Probe.Unwatch(persistRef); + EnsureTerminated(persistRef); persistRef = Sys.ActorOf(Props.Create(() => new JournalCompatActor(OldJournal, persistenceId))); @@ -204,5 +187,13 @@ public void L2db_Journal_Delete_Compat_mode_Preserves_proper_SequenceNr() Output.WriteLine($"oldSeq : {currentSequenceNr.SequenceNumber} - newSeq : {reincaranatedSequenceNr.SequenceNumber}"); reincaranatedSequenceNr.SequenceNumber.Should().Be(currentSequenceNr.SequenceNumber); } + + private void EnsureTerminated(IActorRef actorRef) + { + Probe.Watch(actorRef); + actorRef.Tell(PoisonPill.Instance); + Probe.ExpectTerminated(actorRef); + Probe.Unwatch(actorRef); + } } } \ No newline at end of file diff --git a/src/Akka.Persistence.Linq2Db.Compatibility.Tests/SqlCommonSnapshotCompatibilitySpec.cs b/src/Akka.Persistence.Linq2Db.Compatibility.Tests/SqlCommonSnapshotCompatibilitySpec.cs index 5c206fd8..3f714b01 100644 --- a/src/Akka.Persistence.Linq2Db.Compatibility.Tests/SqlCommonSnapshotCompatibilitySpec.cs +++ b/src/Akka.Persistence.Linq2Db.Compatibility.Tests/SqlCommonSnapshotCompatibilitySpec.cs @@ -1,8 +1,8 @@ using System; using System.Threading.Tasks; using Akka.Actor; +using FluentAssertions.Extensions; using Akka.TestKit; -using LanguageExt.UnitsOfMeasure; using Xunit; using Xunit.Abstractions; @@ -11,9 +11,9 @@ namespace Akka.Persistence.Linq2Db.CompatibilityTests public abstract class SqlCommonSnapshotCompatibilitySpec: IAsyncLifetime { protected abstract Configuration.Config Config { get; } - public SqlCommonSnapshotCompatibilitySpec(ITestOutputHelper outputHelper) + public SqlCommonSnapshotCompatibilitySpec(ITestOutputHelper helper) { - Output = outputHelper; + Output = helper; } protected ITestOutputHelper Output { get; } @@ -31,9 +31,10 @@ public Task InitializeAsync() return Task.CompletedTask; } - public async Task DisposeAsync() + public Task DisposeAsync() { - await Sys.Terminate(); + TestKit.Shutdown(); + return Task.CompletedTask; } [Fact] @@ -47,10 +48,7 @@ public void Can_Recover_SqlCommon_Snapshot() Probe.Send(persistRef, new ContainsEvent { Guid = ourGuid }); Probe.ExpectMsg(true, 5.Seconds()); - Probe.Watch(persistRef); - persistRef.Tell(PoisonPill.Instance); - Probe.ExpectTerminated(persistRef); - Probe.Unwatch(persistRef); + EnsureTerminated(persistRef); persistRef = Sys.ActorOf(Props.Create(() => new SnapshotCompatActor(NewSnapshot, "p-1"))); Probe.Send(persistRef, new ContainsEvent { Guid = ourGuid }); @@ -68,10 +66,7 @@ public void Can_Persist_SqlCommon_Snapshot() Probe.Send(persistRef, new ContainsEvent { Guid = ourGuid }); Probe.ExpectMsg(true, 5.Seconds()); - Probe.Watch(persistRef); - persistRef.Tell(PoisonPill.Instance); - Probe.ExpectTerminated(persistRef); - Probe.Unwatch(persistRef); + EnsureTerminated(persistRef); persistRef = Sys.ActorOf(Props.Create(() => new SnapshotCompatActor(NewSnapshot, "p-2"))); Probe.Send(persistRef, new ContainsEvent { Guid = ourGuid }); @@ -95,10 +90,7 @@ public void SqlCommon_Snapshot_Can_Recover_L2Db_Snapshot() Probe.Send(persistRef, new ContainsEvent { Guid = ourGuid }); Probe.ExpectMsg(true, 5.Seconds()); - Probe.Watch(persistRef); - persistRef.Tell(PoisonPill.Instance); - Probe.ExpectTerminated(persistRef); - Probe.Unwatch(persistRef); + EnsureTerminated(persistRef); persistRef = Sys.ActorOf(Props.Create(() => new SnapshotCompatActor(OldSnapshot, "p-3"))); Probe.Send(persistRef, new ContainsEvent { Guid = ourGuid }); @@ -116,10 +108,7 @@ public void SqlCommon_Snapshot_Can_Persist_L2db_Snapshot() Probe.Send(persistRef, new ContainsEvent { Guid = ourGuid }); Probe.ExpectMsg(true, 5.Seconds()); - Probe.Watch(persistRef); - persistRef.Tell(PoisonPill.Instance); - Probe.ExpectTerminated(persistRef); - Probe.Unwatch(persistRef); + EnsureTerminated(persistRef); persistRef = Sys.ActorOf(Props.Create(() => new SnapshotCompatActor(OldSnapshot, "p-4"))); Probe.Send(persistRef, new ContainsEvent { Guid = ourGuid }); @@ -131,5 +120,13 @@ public void SqlCommon_Snapshot_Can_Persist_L2db_Snapshot() Probe.Send(persistRef, new ContainsEvent { Guid = ourSecondGuid }); Probe.ExpectMsg(true, 10.Seconds()); } + + private void EnsureTerminated(IActorRef actorRef) + { + Probe.Watch(actorRef); + actorRef.Tell(PoisonPill.Instance); + Probe.ExpectTerminated(actorRef); + Probe.Unwatch(actorRef); + } } } \ No newline at end of file diff --git a/src/Akka.Persistence.Linq2Db.Compatibility.Tests/Sqlite/SQLiteCompatibilitySpecConfig.cs b/src/Akka.Persistence.Linq2Db.Compatibility.Tests/Sqlite/SQLiteCompatibilitySpecConfig.cs index 56013a02..83afedd4 100644 --- a/src/Akka.Persistence.Linq2Db.Compatibility.Tests/Sqlite/SQLiteCompatibilitySpecConfig.cs +++ b/src/Akka.Persistence.Linq2Db.Compatibility.Tests/Sqlite/SQLiteCompatibilitySpecConfig.cs @@ -49,7 +49,7 @@ class = ""{typeof(Linq2DbSnapshotStore).AssemblyQualifiedName}"" }}"; return ConfigurationFactory.ParseString(specString) - .WithFallback(Linq2DbPersistence.DefaultConfiguration()); + .WithFallback(Linq2DbPersistence.DefaultConfiguration); } public static Config InitJournalConfig(string tableName, string metadataTableName, string connectionString) diff --git a/src/Akka.Persistence.Linq2Db.Data.Compatibility.Tests/Akka.Persistence.Linq2Db.Data.Compatibility.Tests.csproj b/src/Akka.Persistence.Linq2Db.Data.Compatibility.Tests/Akka.Persistence.Linq2Db.Data.Compatibility.Tests.csproj index 189347df..03e9e213 100644 --- a/src/Akka.Persistence.Linq2Db.Data.Compatibility.Tests/Akka.Persistence.Linq2Db.Data.Compatibility.Tests.csproj +++ b/src/Akka.Persistence.Linq2Db.Data.Compatibility.Tests/Akka.Persistence.Linq2Db.Data.Compatibility.Tests.csproj @@ -18,6 +18,8 @@ + + @@ -39,6 +41,42 @@ + + SqlServer\1_Migration_Setup.sql + Always + + + SqlServer\2_Migration.sql + Always + + + SqlServer\3_Post_Migration_Cleanup.sql + Always + + + PostgreSql\1_Migration_Setup.sql + Always + + + PostgreSql\2_Migration.sql + Always + + + PostgreSql\3_Post_Migration_Cleanup.sql + Always + + + MySql\1_Migration_Setup.sql + Always + + + MySql\2_Migration.sql + Always + + + MySql\3_Post_Migration_Cleanup.sql + Always + Always diff --git a/src/Akka.Persistence.Linq2Db.Data.Compatibility.Tests/DataCompatibilitySpec.cs b/src/Akka.Persistence.Linq2Db.Data.Compatibility.Tests/DataCompatibilitySpec.cs index a4b845c8..e27f6978 100644 --- a/src/Akka.Persistence.Linq2Db.Data.Compatibility.Tests/DataCompatibilitySpec.cs +++ b/src/Akka.Persistence.Linq2Db.Data.Compatibility.Tests/DataCompatibilitySpec.cs @@ -28,7 +28,7 @@ protected DataCompatibilitySpec(ITestOutputHelper output): base(output) { } - protected sealed override void Setup(AkkaConfigurationBuilder builder, IServiceProvider provider) + protected override void Setup(AkkaConfigurationBuilder builder, IServiceProvider provider) { } diff --git a/src/Akka.Persistence.Linq2Db.Data.Compatibility.Tests/DataCompatibilitySpecBase.cs b/src/Akka.Persistence.Linq2Db.Data.Compatibility.Tests/DataCompatibilitySpecBase.cs index 36436186..209e5a88 100644 --- a/src/Akka.Persistence.Linq2Db.Data.Compatibility.Tests/DataCompatibilitySpecBase.cs +++ b/src/Akka.Persistence.Linq2Db.Data.Compatibility.Tests/DataCompatibilitySpecBase.cs @@ -65,10 +65,10 @@ private Config Config() batch-size = 3 db-round-trip-max-batch-size = 6 replay-batch-size = 6 - +{(Settings.SchemaName is { } ? @$" {Settings.TableMapping} {{ - schema-name = {Settings.SchemaName ?? "null"} - }} + schema-name = {Settings.SchemaName} + }}" : "")} }} }} @@ -92,9 +92,10 @@ private Config Config() table-mapping = {Settings.TableMapping} auto-initialize = off +{(Settings.SchemaName is { } ? @$" {Settings.TableMapping} {{ - schema-name = {Settings.SchemaName ?? "null"} - }} + schema-name = {Settings.SchemaName} + }}" : "")} }} }} }}"; @@ -109,7 +110,7 @@ protected virtual void CustomSetup(AkkaConfigurationBuilder builder, IServicePro private void InternalSetup(AkkaConfigurationBuilder builder, IServiceProvider provider) { builder.AddHocon( - Config().WithFallback(Linq2DbPersistence.DefaultConfiguration()), + Config().WithFallback(Linq2DbPersistence.DefaultConfiguration), HoconAddMode.Prepend); Setup(builder, provider); } diff --git a/src/Akka.Persistence.Linq2Db.Data.Compatibility.Tests/MySql/MySqlSpecsFixture.cs b/src/Akka.Persistence.Linq2Db.Data.Compatibility.Tests/MySql/MySqlFixture.cs similarity index 100% rename from src/Akka.Persistence.Linq2Db.Data.Compatibility.Tests/MySql/MySqlSpecsFixture.cs rename to src/Akka.Persistence.Linq2Db.Data.Compatibility.Tests/MySql/MySqlFixture.cs diff --git a/src/Akka.Persistence.Linq2Db.Data.Compatibility.Tests/MySql/MySqlScriptCompatibilitySpec.cs b/src/Akka.Persistence.Linq2Db.Data.Compatibility.Tests/MySql/MySqlScriptCompatibilitySpec.cs new file mode 100644 index 00000000..cd8be48c --- /dev/null +++ b/src/Akka.Persistence.Linq2Db.Data.Compatibility.Tests/MySql/MySqlScriptCompatibilitySpec.cs @@ -0,0 +1,38 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2013-2023 .NET Foundation +// +// ----------------------------------------------------------------------- + +using System.Text; +using MySql.Data.MySqlClient; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Persistence.Linq2Db.Data.Compatibility.Tests.MySql +{ + [Collection("SqlCompatSpec")] + public class MySqlScriptCompatibilitySpec : SqlScriptCompatibilitySpec + { + public MySqlScriptCompatibilitySpec(ITestOutputHelper output) : base(output) + { + } + + protected override TestSettings Settings => MySqlSpecSettings.Instance; + protected override string ScriptFolder => "MySql"; + protected override void ExecuteSqlScripts(string setup, string migration, string cleanup) + { + using var conn = new MySqlConnection(Fixture.ConnectionString); + var script = new MySqlScript(conn); + + script.Query = setup; + script.Execute(); + + script.Query = migration; + script.Execute(); + + script.Query = cleanup; + script.Execute(); + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Linq2Db.Data.Compatibility.Tests/PostgreSql/PostgresSqlSpecsFixture.cs b/src/Akka.Persistence.Linq2Db.Data.Compatibility.Tests/PostgreSql/PostgreSqlFixture.cs similarity index 100% rename from src/Akka.Persistence.Linq2Db.Data.Compatibility.Tests/PostgreSql/PostgresSqlSpecsFixture.cs rename to src/Akka.Persistence.Linq2Db.Data.Compatibility.Tests/PostgreSql/PostgreSqlFixture.cs diff --git a/src/Akka.Persistence.Linq2Db.Data.Compatibility.Tests/PostgreSql/PostgreSqlScriptCompatibilitySpec.cs b/src/Akka.Persistence.Linq2Db.Data.Compatibility.Tests/PostgreSql/PostgreSqlScriptCompatibilitySpec.cs new file mode 100644 index 00000000..5306d197 --- /dev/null +++ b/src/Akka.Persistence.Linq2Db.Data.Compatibility.Tests/PostgreSql/PostgreSqlScriptCompatibilitySpec.cs @@ -0,0 +1,39 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2013-2023 .NET Foundation +// +// ----------------------------------------------------------------------- + +using Npgsql; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Persistence.Linq2Db.Data.Compatibility.Tests.PostgreSql +{ + [Collection("SqlCompatSpec")] + public class PostgreSqlScriptCompatibilitySpec : SqlScriptCompatibilitySpec + { + public PostgreSqlScriptCompatibilitySpec(ITestOutputHelper output) : base(output) + { + } + + protected override TestSettings Settings => PostgreSqlSpecSettings.Instance; + protected override string ScriptFolder => "PostgreSql"; + protected override void ExecuteSqlScripts(string setup, string migration, string cleanup) + { + using var conn = new NpgsqlConnection(Fixture.ConnectionString); + conn.Open(); + + var cmd = new NpgsqlCommand { + Connection = conn + }; + + cmd.CommandText = setup; + cmd.ExecuteNonQuery(); + cmd.CommandText = migration; + cmd.ExecuteNonQuery(); + cmd.CommandText = cleanup; + cmd.ExecuteNonQuery(); + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Linq2Db.Data.Compatibility.Tests/PostgreSql/PostgreSqlSpecSettings.cs b/src/Akka.Persistence.Linq2Db.Data.Compatibility.Tests/PostgreSql/PostgreSqlSpecSettings.cs index fa475cea..8b6d7174 100644 --- a/src/Akka.Persistence.Linq2Db.Data.Compatibility.Tests/PostgreSql/PostgreSqlSpecSettings.cs +++ b/src/Akka.Persistence.Linq2Db.Data.Compatibility.Tests/PostgreSql/PostgreSqlSpecSettings.cs @@ -17,6 +17,5 @@ private PostgreSqlSpecSettings() public override string ProviderName => LinqToDB.ProviderName.PostgreSQL; public override string TableMapping => "postgresql"; - } } \ No newline at end of file diff --git a/src/Akka.Persistence.Linq2Db.Data.Compatibility.Tests/SqlScriptCompatibilitySpec.cs b/src/Akka.Persistence.Linq2Db.Data.Compatibility.Tests/SqlScriptCompatibilitySpec.cs new file mode 100644 index 00000000..e3a5912a --- /dev/null +++ b/src/Akka.Persistence.Linq2Db.Data.Compatibility.Tests/SqlScriptCompatibilitySpec.cs @@ -0,0 +1,40 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2013-2023 .NET Foundation +// +// ----------------------------------------------------------------------- + +using System; +using System.IO; +using Akka.Hosting; +using Akka.Persistence.Linq2Db.Data.Compatibility.Tests.Internal; +using Xunit.Abstractions; + +namespace Akka.Persistence.Linq2Db.Data.Compatibility.Tests +{ + public abstract class SqlScriptCompatibilitySpec: DataCompatibilitySpec where T: ITestContainer, new() + { + protected abstract string ScriptFolder { get; } + + public SqlScriptCompatibilitySpec(ITestOutputHelper output) : base(output) + { + } + + protected override void Setup(AkkaConfigurationBuilder builder, IServiceProvider provider) + { + var workingDir = Path.GetDirectoryName(GetType().Assembly.Location); + var migrationSetup = File.ReadAllText(Path.Combine(workingDir!, ScriptFolder, "1_Migration_Setup.sql")); + var migration = File.ReadAllText(Path.Combine(workingDir!, ScriptFolder, "2_Migration.sql")); + var migrationCleanup = File.ReadAllText(Path.Combine(workingDir!, ScriptFolder, "3_Post_Migration_Cleanup.sql")); + + ExecuteSqlScripts(migrationSetup, migration, migrationCleanup); + + base.Setup(builder, provider); + builder.AddHocon(@" +akka.persistence.journal.linq2db.tag-write-mode = TagTable +akka.persistence.query.journal.linq2db.tag-read-mode = TagTable", HoconAddMode.Prepend); + } + + protected abstract void ExecuteSqlScripts(string setup, string migration, string cleanup); + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Linq2Db.Data.Compatibility.Tests/SqlServer/SqlServerCompatibilitySpec.cs b/src/Akka.Persistence.Linq2Db.Data.Compatibility.Tests/SqlServer/SqlServerCompatibilitySpec.cs index 675e4637..ed473f01 100644 --- a/src/Akka.Persistence.Linq2Db.Data.Compatibility.Tests/SqlServer/SqlServerCompatibilitySpec.cs +++ b/src/Akka.Persistence.Linq2Db.Data.Compatibility.Tests/SqlServer/SqlServerCompatibilitySpec.cs @@ -4,12 +4,6 @@ // // ----------------------------------------------------------------------- -using System; -using Akka.Configuration; -using Akka.Hosting; -using Akka.Persistence.Sql.Linq2Db; -using Akka.Persistence.Sql.Linq2Db.Journal; -using Akka.Persistence.Sql.Linq2Db.Snapshot; using Xunit; using Xunit.Abstractions; diff --git a/src/Akka.Persistence.Linq2Db.Data.Compatibility.Tests/SqlServer/SqlServerScriptCompatibilitySpec.cs b/src/Akka.Persistence.Linq2Db.Data.Compatibility.Tests/SqlServer/SqlServerScriptCompatibilitySpec.cs new file mode 100644 index 00000000..ee3b0156 --- /dev/null +++ b/src/Akka.Persistence.Linq2Db.Data.Compatibility.Tests/SqlServer/SqlServerScriptCompatibilitySpec.cs @@ -0,0 +1,35 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2013-2023 .NET Foundation +// +// ----------------------------------------------------------------------- + +using Microsoft.Data.SqlClient; +using Microsoft.SqlServer.Management.Common; +using Microsoft.SqlServer.Management.Smo; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Persistence.Linq2Db.Data.Compatibility.Tests.SqlServer +{ + [Collection("SqlCompatSpec")] + public class SqlServerScriptCompatibilitySpec: SqlScriptCompatibilitySpec + { + public SqlServerScriptCompatibilitySpec(ITestOutputHelper output) : base(output) + { + } + + protected override TestSettings Settings => SqlServerSpecSettings.Instance; + protected override string ScriptFolder => "SqlServer"; + + protected override void ExecuteSqlScripts(string setup, string migration, string cleanup) + { + using var conn = new SqlConnection(Fixture.ConnectionString); + var server = new Server(new ServerConnection(conn)); + + server.ConnectionContext.ExecuteNonQuery(setup); + server.ConnectionContext.ExecuteNonQuery(migration); + server.ConnectionContext.ExecuteNonQuery(cleanup); + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Linq2Db.Data.Compatibility.Tests/SqlServer/SqlServerSpecSettings.cs b/src/Akka.Persistence.Linq2Db.Data.Compatibility.Tests/SqlServer/SqlServerSpecSettings.cs index 11bd078d..a496fec7 100644 --- a/src/Akka.Persistence.Linq2Db.Data.Compatibility.Tests/SqlServer/SqlServerSpecSettings.cs +++ b/src/Akka.Persistence.Linq2Db.Data.Compatibility.Tests/SqlServer/SqlServerSpecSettings.cs @@ -17,8 +17,5 @@ private SqlServerSpecSettings() public override string ProviderName => LinqToDB.ProviderName.SqlServer2017; public override string TableMapping => "sql-server"; - - public override string SchemaName => "dbo"; - } } \ No newline at end of file diff --git a/src/Akka.Persistence.Linq2Db.Data.Compatibility.Tests/SqlServer/SqlServerSpecsFixture.cs b/src/Akka.Persistence.Linq2Db.Data.Compatibility.Tests/SqlServer/SqlServerSpecsFixture.cs index 2b663884..763fec29 100644 --- a/src/Akka.Persistence.Linq2Db.Data.Compatibility.Tests/SqlServer/SqlServerSpecsFixture.cs +++ b/src/Akka.Persistence.Linq2Db.Data.Compatibility.Tests/SqlServer/SqlServerSpecsFixture.cs @@ -63,7 +63,8 @@ protected override Task AfterContainerStartedAsync() ["Server"] = $"localhost,{Port}", ["Database"] = DatabaseName, ["User Id"] = User, - ["Password"] = Password + ["Password"] = Password, + ["TrustServerCertificate"] = "true" }; _connectionString = builder.ToString(); diff --git a/src/Akka.Persistence.Linq2Db.IndexHelperApp/Akka.Persistence.Linq2Db.IndexHelperApp.csproj b/src/Akka.Persistence.Linq2Db.IndexHelperApp/Akka.Persistence.Linq2Db.IndexHelperApp.csproj new file mode 100644 index 00000000..4c2a6e43 --- /dev/null +++ b/src/Akka.Persistence.Linq2Db.IndexHelperApp/Akka.Persistence.Linq2Db.IndexHelperApp.csproj @@ -0,0 +1,26 @@ + + + + Exe + netcoreapp3.1 + + + + + + + + + + + + + + + + + Always + + + + diff --git a/src/Akka.Persistence.Linq2Db.IndexHelperApp/Options.cs b/src/Akka.Persistence.Linq2Db.IndexHelperApp/Options.cs new file mode 100644 index 00000000..b8ebaa58 --- /dev/null +++ b/src/Akka.Persistence.Linq2Db.IndexHelperApp/Options.cs @@ -0,0 +1,22 @@ +using CommandLine; + +namespace Akka.Persistence.Linq2Db.IndexHelperApp +{ + public class Options + { + [Option('f',"file", Required=true, HelpText = "Specify the HOCON file to use")] + public string File { get; set; } + + [Option('p',"path", Required = true, HelpText = "The Path to the Akka.Persistence.Linq2Db Config in the HOCON.")] + public string HoconPath { get; set; } + + [Option("OrderingIdx", Required = true, Group = "IndexType", HelpText = "Generates the SQL Text for an Ordering index")] + public bool GenerateOrdering { get; set; } + + [Option("PidSeqNoIdx", Required = true, Group = "IndexType", HelpText = "Generates the SQL Text for an index on PersistenceID and SeqNo")] + public bool GeneratePidSeqNo { get; set; } + + [Option("TimeStampIdx", Required = true, Group = "IndexType", HelpText = "Generates the SQL Text for a Timestamp Index")] + public bool GenerateTimestamp { get; set; } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Linq2Db.IndexHelperApp/Program.cs b/src/Akka.Persistence.Linq2Db.IndexHelperApp/Program.cs new file mode 100644 index 00000000..117ce111 --- /dev/null +++ b/src/Akka.Persistence.Linq2Db.IndexHelperApp/Program.cs @@ -0,0 +1,117 @@ +using System; +using System.IO; +using Akka.Configuration; +using Akka.Persistence.Linq2Db.IndexHelperLib; +using Akka.Persistence.Sql.Linq2Db; +using Akka.Persistence.Sql.Linq2Db.Config; +using Akka.Persistence.Sql.Linq2Db.Tests; +using CommandLine; +using FluentMigrator.Expressions; +using FluentMigrator.Runner.Generators; +using FluentMigrator.Runner.Generators.Generic; +using FluentMigrator.Runner.Generators.MySql; +using FluentMigrator.Runner.Generators.Oracle; +using FluentMigrator.Runner.Generators.Postgres; +using FluentMigrator.Runner.Generators.Postgres92; +using FluentMigrator.Runner.Generators.SQLite; +using FluentMigrator.Runner.Generators.SqlServer; +using FluentMigrator.Runner.Processors.Postgres; +using LinqToDB; +using Microsoft.Extensions.Options; +using Newtonsoft.Json; + +namespace Akka.Persistence.Linq2Db.IndexHelperApp +{ + public static class Program + { + public static void Main(string[] args) + { + Parser.Default.ParseArguments(args) + .WithParsed(opts => + { + //var str = Linq2DbJournalDefaultSpecConfig.customConfig("testGen", + // "journalTbl", "metadataTbl", ProviderName.SqlServer, + // "connStr"); + var conf = ConfigurationFactory.ParseString(File.ReadAllText(opts.File)); + + var journalConf = new JournalConfig(conf + .GetConfig(opts.HoconPath) + //.GetConfig("akka.persistence.journal.linq2db.testGen") + .WithFallback(Linq2DbPersistence.DefaultConfiguration)); + var generator = GetGenerator(journalConf.ProviderName); + var helper = new JournalIndexHelper(); + GeneratePerOptions(opts, helper, journalConf, generator); + }); + } + + private static void GeneratePerOptions( + Options opts, + JournalIndexHelper helper, + JournalConfig journalConf, + GenericGenerator generator) + { + if (opts.GeneratePidSeqNo) + { + var orderingExpr = new CreateIndexExpression + { + Index = helper.JournalOrdering( + journalConf.TableConfig.EventJournalTable.Name, + journalConf.TableConfig.EventJournalTable.ColumnNames.Ordering, + journalConf.TableConfig.SchemaName) + }; + GenerateWithHeaderAndFooter(generator, orderingExpr, "Ordering"); + + var indexExpr = new CreateIndexExpression + { + Index = helper.DefaultJournalIndex( + journalConf.TableConfig.EventJournalTable.Name, + journalConf.TableConfig.EventJournalTable.ColumnNames.PersistenceId, + journalConf.TableConfig.EventJournalTable.ColumnNames.SequenceNumber, + journalConf.TableConfig.SchemaName) + }; + GenerateWithHeaderAndFooter(generator, indexExpr, "PidAndSequenceNo"); + } + + if (opts.GenerateTimestamp) + { + var timestampExpr = new CreateIndexExpression() + { + Index = helper.JournalTimestamp( + journalConf.TableConfig.EventJournalTable.Name, + journalConf.TableConfig.EventJournalTable.ColumnNames.Created, + journalConf.TableConfig.SchemaName) + }; + GenerateWithHeaderAndFooter(generator, timestampExpr, "Timestamp"); + } + } + + private static void GenerateWithHeaderAndFooter( + GenericGenerator generator, + CreateIndexExpression expr, + string indexType) + { + Console.WriteLine("-------"); + Console.WriteLine($"----{indexType} Index Create Below"); + Console.WriteLine(generator.Generate(expr)); + Console.WriteLine($"----{indexType} Index Create Above"); + Console.WriteLine("-------"); + } + + private static GenericGenerator GetGenerator(string dbArg) + { + const StringComparison comp = StringComparison.InvariantCultureIgnoreCase; + return dbArg switch + { + _ when dbArg.StartsWith("sqlserver", comp) => new SqlServer2008Generator(), + _ when dbArg.Contains("sqlite", comp) => new SQLiteGenerator(), + _ when dbArg.Contains("postgres", comp) => + new Postgres92Generator( + new PostgresQuoter(new PostgresOptions()), + new OptionsWrapper(new GeneratorOptions())), + _ when dbArg.Contains("mysql", comp) => new MySql5Generator(), + _ when dbArg.Contains("oracle", comp) => new OracleGenerator(), + _ => throw new Exception("IDK what to do with this!") + }; + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Linq2Db.IndexHelperApp/example.hocon b/src/Akka.Persistence.Linq2Db.IndexHelperApp/example.hocon new file mode 100644 index 00000000..c61a4cc1 --- /dev/null +++ b/src/Akka.Persistence.Linq2Db.IndexHelperApp/example.hocon @@ -0,0 +1,15 @@ +akka.persistence.journal.linq2db { + testGen { + class = "Akka.Persistence.Sql.Linq2Db.Journal.Linq2DbWriteJournal, Akka.Persistence.Sql.Linq2Db" + provider-name = "SqlServer" + connection-string = "connStr" + tables { + journal { + auto-init = true + warn-on-auto-init-fail = false + table-name = "journalTbl" + metadata-table-name = "metadataTbl" + } + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Linq2Db.IndexHelperLib/Akka.Persistence.Linq2Db.IndexHelperLib.csproj b/src/Akka.Persistence.Linq2Db.IndexHelperLib/Akka.Persistence.Linq2Db.IndexHelperLib.csproj new file mode 100644 index 00000000..54627a03 --- /dev/null +++ b/src/Akka.Persistence.Linq2Db.IndexHelperLib/Akka.Persistence.Linq2Db.IndexHelperLib.csproj @@ -0,0 +1,15 @@ + + + + netstandard2.0 + + + + + + + + + + + diff --git a/src/Akka.Persistence.Linq2Db.IndexHelperLib/JournalIndexHelper.cs b/src/Akka.Persistence.Linq2Db.IndexHelperLib/JournalIndexHelper.cs new file mode 100644 index 00000000..691732d9 --- /dev/null +++ b/src/Akka.Persistence.Linq2Db.IndexHelperLib/JournalIndexHelper.cs @@ -0,0 +1,50 @@ +using System; +using FluentMigrator.Model; + +namespace Akka.Persistence.Linq2Db.IndexHelperLib +{ + public class JournalIndexHelper + { + public IndexDefinition DefaultJournalIndex(string tableName, string persistenceIdCol, string sequenceNoCol, string schemaName = null) + { + var idx = BeginCreateIndex(tableName, schemaName, $"UX_{tableName}_PID_SEQNO"); + //short name for easy compat with all dbs. (*cough* oracle *cough*) + idx.Columns.Add(new IndexColumnDefinition(){ Name = persistenceIdCol }); + idx.Columns.Add(new IndexColumnDefinition(){Name = sequenceNoCol, Direction = Direction.Ascending}); + idx.IsUnique = true; + return idx; + } + + public IndexDefinition JournalOrdering(string tableName, + string orderingCol, string schemaName = null) + { + var idx = BeginCreateIndex(tableName, schemaName,$"IX_{tableName}_Ordering"); + idx.Columns.Add(new IndexColumnDefinition(){Name = orderingCol}); + //Should it be? + //idx.IsUnique = true; + return idx; + } + + public IndexDefinition JournalTimestamp(string tableName, + string timestampCol, string schemaName = null) + { + var idx = BeginCreateIndex(tableName, schemaName, + $"IX_{tableName}_TimeStamp"); + idx.Columns.Add(new IndexColumnDefinition(){Name = timestampCol}); + //Not unique by any stretch. + return idx; + } + + private static IndexDefinition BeginCreateIndex(string tableName, string schemaName, string indexName) + { + var idx = new IndexDefinition(); + if (string.IsNullOrWhiteSpace(schemaName) == false) + { + idx.SchemaName = schemaName; + } + idx.TableName = tableName; + idx.Name = indexName; + return idx; + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Linq2Db.Journal.Query.Tests/SqliteAllEventsSpec.cs b/src/Akka.Persistence.Linq2Db.Journal.Query.Tests/SqliteAllEventsSpec.cs index 2fd858a4..c069d797 100644 --- a/src/Akka.Persistence.Linq2Db.Journal.Query.Tests/SqliteAllEventsSpec.cs +++ b/src/Akka.Persistence.Linq2Db.Journal.Query.Tests/SqliteAllEventsSpec.cs @@ -42,7 +42,7 @@ public static Config Config(int id) auto-initialize = on }} akka.test.single-expect-default = 10s") - .WithFallback(Linq2DbPersistence.DefaultConfiguration()); + .WithFallback(Linq2DbPersistence.DefaultConfiguration); } public SqliteAllEventsSpec(ITestOutputHelper output) : base(Config(Counter.GetAndIncrement()), nameof(SqliteAllEventsSpec), output) diff --git a/src/Akka.Persistence.Linq2Db.Journal.Query.Tests/SqliteCurrentAllEventsSpec.cs b/src/Akka.Persistence.Linq2Db.Journal.Query.Tests/SqliteCurrentAllEventsSpec.cs index de297de7..99323d19 100644 --- a/src/Akka.Persistence.Linq2Db.Journal.Query.Tests/SqliteCurrentAllEventsSpec.cs +++ b/src/Akka.Persistence.Linq2Db.Journal.Query.Tests/SqliteCurrentAllEventsSpec.cs @@ -43,7 +43,7 @@ public static Config Config(int id) auto-initialize = on }} akka.test.single-expect-default = 10s") - .WithFallback(Linq2DbPersistence.DefaultConfiguration()); + .WithFallback(Linq2DbPersistence.DefaultConfiguration); } public SqliteCurrentAllEventsSpec(ITestOutputHelper output) : base(Config(Counter.GetAndIncrement()), nameof(SqliteCurrentAllEventsSpec), output) { diff --git a/src/Akka.Persistence.Linq2Db.Journal.Query.Tests/SqliteCurrentEventsByPersistenceIdSpec.cs b/src/Akka.Persistence.Linq2Db.Journal.Query.Tests/SqliteCurrentEventsByPersistenceIdSpec.cs index d3e72a94..cf04067c 100644 --- a/src/Akka.Persistence.Linq2Db.Journal.Query.Tests/SqliteCurrentEventsByPersistenceIdSpec.cs +++ b/src/Akka.Persistence.Linq2Db.Journal.Query.Tests/SqliteCurrentEventsByPersistenceIdSpec.cs @@ -54,7 +54,7 @@ class = ""{typeof(Linq2DbWriteJournal).AssemblyQualifiedName}"" }} }} akka.test.single-expect-default = 10s") - .WithFallback(Linq2DbPersistence.DefaultConfiguration()); + .WithFallback(Linq2DbPersistence.DefaultConfiguration); } public SqliteCurrentEventsByPersistenceIdSpec(ITestOutputHelper output) : base(Config(Counter.GetAndIncrement()), nameof(SqliteCurrentEventsByPersistenceIdSpec), output) diff --git a/src/Akka.Persistence.Linq2Db.Journal.Query.Tests/SqliteCurrentEventsByTagSpec.cs b/src/Akka.Persistence.Linq2Db.Journal.Query.Tests/SqliteCurrentEventsByTagSpec.cs index ecef9d75..3b2368a7 100644 --- a/src/Akka.Persistence.Linq2Db.Journal.Query.Tests/SqliteCurrentEventsByTagSpec.cs +++ b/src/Akka.Persistence.Linq2Db.Journal.Query.Tests/SqliteCurrentEventsByTagSpec.cs @@ -55,7 +55,7 @@ class = ""{typeof(Linq2DbWriteJournal).AssemblyQualifiedName}"" auto-initialize = on }} akka.test.single-expect-default = 10s") - .WithFallback(Linq2DbPersistence.DefaultConfiguration()); + .WithFallback(Linq2DbPersistence.DefaultConfiguration); } public SqliteCurrentEventsByTagSpec(ITestOutputHelper output) : base(Config(Counter.GetAndIncrement()), nameof(SqliteCurrentEventsByTagSpec), output) diff --git a/src/Akka.Persistence.Linq2Db.Journal.Query.Tests/SqliteCurrentPersistenceIdsSpec.cs b/src/Akka.Persistence.Linq2Db.Journal.Query.Tests/SqliteCurrentPersistenceIdsSpec.cs index 151bb511..e87e764e 100644 --- a/src/Akka.Persistence.Linq2Db.Journal.Query.Tests/SqliteCurrentPersistenceIdsSpec.cs +++ b/src/Akka.Persistence.Linq2Db.Journal.Query.Tests/SqliteCurrentPersistenceIdsSpec.cs @@ -46,7 +46,7 @@ public static Config Config(int id) auto-initialize = on }} akka.test.single-expect-default = 10s") - .WithFallback(Linq2DbPersistence.DefaultConfiguration()); + .WithFallback(Linq2DbPersistence.DefaultConfiguration); } public SqliteCurrentPersistenceIdsSpec(ITestOutputHelper output) : base(Config(Counter.GetAndIncrement()), nameof(SqliteCurrentPersistenceIdsSpec), output) diff --git a/src/Akka.Persistence.Linq2Db.Journal.Query.Tests/SqliteEventsByPersistenceIdSpec.cs b/src/Akka.Persistence.Linq2Db.Journal.Query.Tests/SqliteEventsByPersistenceIdSpec.cs index a125a2a8..d9163814 100644 --- a/src/Akka.Persistence.Linq2Db.Journal.Query.Tests/SqliteEventsByPersistenceIdSpec.cs +++ b/src/Akka.Persistence.Linq2Db.Journal.Query.Tests/SqliteEventsByPersistenceIdSpec.cs @@ -44,7 +44,7 @@ public static Config Config(int id) auto-initialize = on }} akka.test.single-expect-default = 10s") - .WithFallback(Linq2DbPersistence.DefaultConfiguration()); + .WithFallback(Linq2DbPersistence.DefaultConfiguration); } public SqliteEventsByPersistenceIdSpec(ITestOutputHelper output) : base(Config(Counter.GetAndIncrement()), nameof(SqliteEventsByPersistenceIdSpec), output) diff --git a/src/Akka.Persistence.Linq2Db.Journal.Query.Tests/SqliteEventsByTagSpec.cs b/src/Akka.Persistence.Linq2Db.Journal.Query.Tests/SqliteEventsByTagSpec.cs index c323f355..9a056792 100644 --- a/src/Akka.Persistence.Linq2Db.Journal.Query.Tests/SqliteEventsByTagSpec.cs +++ b/src/Akka.Persistence.Linq2Db.Journal.Query.Tests/SqliteEventsByTagSpec.cs @@ -52,7 +52,7 @@ public static Config Config(int id) auto-initialize = on }} akka.test.single-expect-default = 10s") - .WithFallback(Linq2DbPersistence.DefaultConfiguration()); + .WithFallback(Linq2DbPersistence.DefaultConfiguration); } public SqliteEventsByTagSpec(ITestOutputHelper output) : base(Config(Counter.GetAndIncrement()), nameof(SqliteEventsByTagSpec), output) diff --git a/src/Akka.Persistence.Linq2Db.Journal.Query.Tests/SqlitePersistenceIdsSpec.cs b/src/Akka.Persistence.Linq2Db.Journal.Query.Tests/SqlitePersistenceIdsSpec.cs index f9ddfea6..2b94a927 100644 --- a/src/Akka.Persistence.Linq2Db.Journal.Query.Tests/SqlitePersistenceIdsSpec.cs +++ b/src/Akka.Persistence.Linq2Db.Journal.Query.Tests/SqlitePersistenceIdsSpec.cs @@ -71,7 +71,7 @@ class = ""Akka.Persistence.Sqlite.Snapshot.SqliteSnapshotStore, Akka.Persistence write-plugin = ""akka.persistence.journal.linq2db"" }} akka.test.single-expect-default = 10s") - .WithFallback(Linq2DbPersistence.DefaultConfiguration()); + .WithFallback(Linq2DbPersistence.DefaultConfiguration); } } diff --git a/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Docker/PostgreDbUtils.cs b/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Docker/PostgreDbUtils.cs new file mode 100644 index 00000000..ef15df03 --- /dev/null +++ b/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Docker/PostgreDbUtils.cs @@ -0,0 +1,74 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2016 Lightbend Inc. +// Copyright (C) 2013-2016 Akka.NET project +// +//----------------------------------------------------------------------- + +using System; +using Npgsql; + +namespace Akka.Persistence.Sql.Linq2Db.Tests.Docker.Docker +{ + public static class PostgreDbUtils + { + public static string ConnectionString { get; private set; } + + public static void Initialize(PostgreSqlFixture fixture) + { + ConnectionString = fixture.ConnectionString; + var connectionBuilder = new NpgsqlConnectionStringBuilder(ConnectionString); + + //connect to postgres database to create a new database + var databaseName = connectionBuilder.Database; + + using var conn = new NpgsqlConnection(ConnectionString); + conn.Open(); + + bool dbExists; + using (var cmd = new NpgsqlCommand()) + { + cmd.CommandText = $@"SELECT TRUE FROM pg_database WHERE datname='{databaseName}'"; + cmd.Connection = conn; + + var result = cmd.ExecuteScalar(); + dbExists = result != null && Convert.ToBoolean(result); + } + + if (dbExists) + { + DoClean(conn); + } + else + { + DoCreate(conn, databaseName); + } + } + + public static void Clean() + { + using var conn = new NpgsqlConnection(ConnectionString); + conn.Open(); + DoClean(conn); + } + + private static void DoCreate(NpgsqlConnection conn, string databaseName) + { + using var cmd = new NpgsqlCommand(); + cmd.CommandText = $@"CREATE DATABASE {databaseName}"; + cmd.Connection = conn; + cmd.ExecuteNonQuery(); + } + + private static void DoClean(NpgsqlConnection conn) + { + using var cmd = new NpgsqlCommand(); + cmd.CommandText = @" + DROP TABLE IF EXISTS public.event_journal; + DROP TABLE IF EXISTS public.snapshot_store; + DROP TABLE IF EXISTS public.metadata;"; + cmd.Connection = conn; + cmd.ExecuteNonQuery(); + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Docker/PostgreSQLFixture.cs b/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Docker/PostgreSQLFixture.cs index 124c1cf5..c455c95b 100644 --- a/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Docker/PostgreSQLFixture.cs +++ b/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Docker/PostgreSQLFixture.cs @@ -5,6 +5,7 @@ using Akka.Util; using Docker.DotNet; using Docker.DotNet.Models; +using Npgsql; using Xunit; namespace Akka.Persistence.Sql.Linq2Db.Tests.Docker.Docker @@ -15,7 +16,6 @@ namespace Akka.Persistence.Sql.Linq2Db.Tests.Docker.Docker public class PostgreSqlFixture : IAsyncLifetime { protected readonly string PostgresContainerName = $"postgresSqlServer-{Guid.NewGuid():N}"; - //protected readonly string PostgreSqlImageName = $"PostgreSQL-{Guid.NewGuid():N}"; protected readonly DockerClient Client; public PostgreSqlFixture() @@ -88,17 +88,18 @@ await Client.Containers.CreateContainerAsync(new CreateContainerParameters // Provide a 10 second startup delay await Task.Delay(TimeSpan.FromSeconds(10)); - ConnectionString = $"Server=127.0.0.1;Port={sqlServerHostPort};" + - "Database=postgres;User Id=postgres;Password=postgres"; + //ConnectionString = $"Server=127.0.0.1;Port={sqlServerHostPort};Database=postgres;User Id=postgres;Password=postgres"; - //var connectionString = new NpgsqlConnectionStringBuilder() - //{ - // Host = "localhost", Password = "l0lTh1sIsOpenSource", - // Username = "postgres", Database = "postgres", - // Port = sqlServerHostPort - //}; - // - //ConnectionString = connectionString.ToString(); + var connectionString = new NpgsqlConnectionStringBuilder() + { + Host = "localhost", + Password = "postgres", + Username = "postgres", + Database = "postgres", + Port = sqlServerHostPort + }; + + ConnectionString = connectionString.ToString(); } public async Task DisposeAsync() diff --git a/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Docker/PostgreSQLSpecsFixture.cs b/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Docker/PostgreSQLSpecsFixture.cs deleted file mode 100644 index 7812712a..00000000 --- a/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Docker/PostgreSQLSpecsFixture.cs +++ /dev/null @@ -1,9 +0,0 @@ -using Xunit; - -namespace Akka.Persistence.Sql.Linq2Db.Tests.Docker.Docker -{ - [CollectionDefinition("PostgreSQLSpec")] - public sealed class PostgreSqlSpecsFixture : ICollectionFixture - { - } -} \ No newline at end of file diff --git a/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Docker/PostgreSqlSpecsFixture.cs b/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Docker/PostgreSqlSpecsFixture.cs new file mode 100644 index 00000000..7fadfab9 --- /dev/null +++ b/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Docker/PostgreSqlSpecsFixture.cs @@ -0,0 +1,15 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2013-2022 .NET Foundation +// +// ----------------------------------------------------------------------- + +using Xunit; + +namespace Akka.Persistence.Sql.Linq2Db.Tests.Docker.Docker +{ + [CollectionDefinition("PostgreSqlSpec")] + public sealed class PostgreSqlSpecsFixture : ICollectionFixture + { + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Docker/DockerDbUtils.cs b/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Docker/SqlServerDbUtils.cs similarity index 97% rename from src/Akka.Persistence.Sql.Linq2Db.DockerTests/Docker/DockerDbUtils.cs rename to src/Akka.Persistence.Sql.Linq2Db.DockerTests/Docker/SqlServerDbUtils.cs index d302be3b..829ae5cb 100644 --- a/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Docker/DockerDbUtils.cs +++ b/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Docker/SqlServerDbUtils.cs @@ -2,7 +2,7 @@ namespace Akka.Persistence.Sql.Linq2Db.Tests.Docker.Docker { - public static class DockerDbUtils + public static class SqlServerDbUtils { public static string ConnectionString { get; private set; } @@ -25,7 +25,6 @@ IF db_id('{0}') IS NULL BEGIN CREATE DATABASE {0} END - ", databaseName); cmd.Connection = conn; diff --git a/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Docker/SqlServerSpecsFixture.cs b/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Docker/SqlServerSpecsFixture.cs index 9a11b7a1..90a327bd 100644 --- a/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Docker/SqlServerSpecsFixture.cs +++ b/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Docker/SqlServerSpecsFixture.cs @@ -1,4 +1,10 @@ -using Xunit; +// ----------------------------------------------------------------------- +// +// Copyright (C) 2013-2022 .NET Foundation +// +// ----------------------------------------------------------------------- + +using Xunit; namespace Akka.Persistence.Sql.Linq2Db.Tests.Docker.Docker { diff --git a/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Postgres/PostgreSQLJournalSpec.cs b/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Postgres/PostgreSQLJournalSpec.cs index 115777fa..f24ca121 100644 --- a/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Postgres/PostgreSQLJournalSpec.cs +++ b/src/Akka.Persistence.Sql.Linq2Db.DockerTests/Postgres/PostgreSQLJournalSpec.cs @@ -14,7 +14,7 @@ namespace Akka.Persistence.Sql.Linq2Db.Tests.Docker.Postgres { - [Collection("PostgreSQLSpec")] + [Collection("PostgreSqlSpec")] public class PostgreSqlSnapshotSpec : SnapshotStoreSpec { @@ -44,12 +44,11 @@ class = ""{typeof(Linq2DbSnapshotStore).AssemblyQualifiedName}"" public PostgreSqlSnapshotSpec(ITestOutputHelper outputHelper, PostgreSqlFixture fixture) : base(Configuration(fixture)) { - var extension = Linq2DbPersistence.Get(Sys); DebuggingHelpers.SetupTraceDump(outputHelper); var connFactory = new AkkaPersistenceDataConnectionFactory( new SnapshotConfig( Configuration(fixture) - .WithFallback(extension.DefaultConfig) + .WithFallback(Linq2DbPersistence.DefaultConfiguration) .GetConfig("akka.persistence.snapshot-store.linq2db"))); using (var conn = connFactory.GetConnection()) { @@ -67,7 +66,7 @@ public PostgreSqlSnapshotSpec(ITestOutputHelper outputHelper, PostgreSqlFixture } } - [Collection("PostgreSQLSpec")] + [Collection("PostgreSqlSpec")] public class DockerLinq2DbPostgreSqlJournalSpec : JournalSpec { public static Configuration.Config Create(string connString) @@ -92,9 +91,8 @@ class = ""{typeof(Linq2DbWriteJournal).AssemblyQualifiedName}"" public DockerLinq2DbPostgreSqlJournalSpec(ITestOutputHelper output, PostgreSqlFixture fixture) : base(InitConfig(fixture), "postgresperf", output) { - var extension = Linq2DbPersistence.Get(Sys); - var config = Create(DockerDbUtils.ConnectionString) - .WithFallback(extension.DefaultConfig) + var config = Create(SqlServerDbUtils.ConnectionString) + .WithFallback(Linq2DbPersistence.DefaultConfiguration) .GetConfig("akka.persistence.journal.linq2db"); var connFactory = new AkkaPersistenceDataConnectionFactory(new JournalConfig(config)); using (var conn = connFactory.GetConnection()) diff --git a/src/Akka.Persistence.Sql.Linq2Db.DockerTests/SqlServer/SQLServerJournalCustomConfigSpec.cs b/src/Akka.Persistence.Sql.Linq2Db.DockerTests/SqlServer/SQLServerJournalCustomConfigSpec.cs index 75762aba..912b862c 100644 --- a/src/Akka.Persistence.Sql.Linq2Db.DockerTests/SqlServer/SQLServerJournalCustomConfigSpec.cs +++ b/src/Akka.Persistence.Sql.Linq2Db.DockerTests/SqlServer/SQLServerJournalCustomConfigSpec.cs @@ -15,7 +15,7 @@ public class SqlServerJournalCustomConfigSpec : JournalSpec { public static Configuration.Config Initialize(SqlServerFixture fixture) { - DockerDbUtils.Initialize(fixture.ConnectionString); + SqlServerDbUtils.Initialize(fixture.ConnectionString); return Configuration; } @@ -24,16 +24,15 @@ public static Configuration.Config Initialize(SqlServerFixture fixture) "customSpec", "customJournalSpec", "customJournalMetadata", - ProviderName.SqlServer2017, DockerDbUtils.ConnectionString, true); + ProviderName.SqlServer2017, SqlServerDbUtils.ConnectionString, true); public SqlServerJournalCustomConfigSpec(ITestOutputHelper outputHelper, SqlServerFixture fixture) : base(Initialize(fixture), "SQLServer-custom", outputHelper) { - var extension = Linq2DbPersistence.Get(Sys); var connFactory = new AkkaPersistenceDataConnectionFactory(new JournalConfig( Configuration .GetConfig("akka.persistence.journal.customSpec") - .WithFallback(extension.DefaultJournalConfig))); + .WithFallback(Linq2DbPersistence.DefaultJournalConfiguration))); using (var conn = connFactory.GetConnection()) { try diff --git a/src/Akka.Persistence.Sql.Linq2Db.DockerTests/SqlServer/SQLServerJournalDefaultConfigSpec.cs b/src/Akka.Persistence.Sql.Linq2Db.DockerTests/SqlServer/SQLServerJournalDefaultConfigSpec.cs index b52aef80..ae758ef9 100644 --- a/src/Akka.Persistence.Sql.Linq2Db.DockerTests/SqlServer/SQLServerJournalDefaultConfigSpec.cs +++ b/src/Akka.Persistence.Sql.Linq2Db.DockerTests/SqlServer/SQLServerJournalDefaultConfigSpec.cs @@ -15,7 +15,7 @@ public class SqlServerJournalDefaultConfigSpec : JournalSpec { public static Configuration.Config Initialize(SqlServerFixture fixture) { - DockerDbUtils.Initialize(fixture.ConnectionString); + SqlServerDbUtils.Initialize(fixture.ConnectionString); return Configuration; } @@ -24,15 +24,14 @@ public static Configuration.Config Initialize(SqlServerFixture fixture) "defaultJournalSpec", "defaultJournalMetadata", ProviderName.SqlServer2017, - DockerDbUtils.ConnectionString); + SqlServerDbUtils.ConnectionString); public SqlServerJournalDefaultConfigSpec(ITestOutputHelper outputHelper, SqlServerFixture fixture) : base(Initialize(fixture), "SQLServer-default", outputHelper) { - var extension = Linq2DbPersistence.Get(Sys); var connFactory = new AkkaPersistenceDataConnectionFactory(new JournalConfig( Configuration - .WithFallback(extension.DefaultConfig) + .WithFallback(Linq2DbPersistence.DefaultConfiguration) .GetConfig("akka.persistence.journal.linq2db"))); using (var conn = connFactory.GetConnection()) { diff --git a/src/Akka.Persistence.Sql.Linq2Db.DockerTests/SqlServer/SQLServerJournalSpec.cs b/src/Akka.Persistence.Sql.Linq2Db.DockerTests/SqlServer/SQLServerJournalSpec.cs index 57c7905f..0073ec73 100644 --- a/src/Akka.Persistence.Sql.Linq2Db.DockerTests/SqlServer/SQLServerJournalSpec.cs +++ b/src/Akka.Persistence.Sql.Linq2Db.DockerTests/SqlServer/SQLServerJournalSpec.cs @@ -15,19 +15,18 @@ public class SqlServerJournalSpec : JournalSpec { public static Configuration.Config Initialize(SqlServerFixture fixture) { - DockerDbUtils.Initialize(fixture.ConnectionString); + SqlServerDbUtils.Initialize(fixture.ConnectionString); return Configuration; } private static Configuration.Config Configuration => - SqlServerJournalSpecConfig.Create(DockerDbUtils.ConnectionString, "journalSpec"); + SqlServerJournalSpecConfig.Create(SqlServerDbUtils.ConnectionString, "journalSpec"); public SqlServerJournalSpec(ITestOutputHelper outputHelper, SqlServerFixture fixture) : base(Initialize(fixture), "SQLServer", outputHelper) { - var extension = Linq2DbPersistence.Get(Sys); var config = Configuration - .WithFallback(extension.DefaultConfig) + .WithFallback(Linq2DbPersistence.DefaultConfiguration) .GetConfig("akka.persistence.journal.linq2db"); var connFactory = new AkkaPersistenceDataConnectionFactory(new JournalConfig(config)); using (var conn = connFactory.GetConnection()) diff --git a/src/Akka.Persistence.Sql.Linq2Db.DockerTests/SqlServer/SQLServerSnapshotSpec.cs b/src/Akka.Persistence.Sql.Linq2Db.DockerTests/SqlServer/SQLServerSnapshotSpec.cs index 28853a8d..fd6f4e39 100644 --- a/src/Akka.Persistence.Sql.Linq2Db.DockerTests/SqlServer/SQLServerSnapshotSpec.cs +++ b/src/Akka.Persistence.Sql.Linq2Db.DockerTests/SqlServer/SQLServerSnapshotSpec.cs @@ -15,20 +15,19 @@ public class SqlServerSnapshotSpec : SnapshotStoreSpec { public static Configuration.Config Initialize(SqlServerFixture fixture) { - DockerDbUtils.Initialize(fixture.ConnectionString); + SqlServerDbUtils.Initialize(fixture.ConnectionString); return Configuration; } - private static Configuration.Config Configuration => SqlServerSnapshotSpecConfig.Create(DockerDbUtils.ConnectionString,"snapshotSpec"); + private static Configuration.Config Configuration => SqlServerSnapshotSpecConfig.Create(SqlServerDbUtils.ConnectionString,"snapshotSpec"); public SqlServerSnapshotSpec(ITestOutputHelper outputHelper, SqlServerFixture fixture) : base(Initialize(fixture)) { DebuggingHelpers.SetupTraceDump(outputHelper); - var extension = Linq2DbPersistence.Get(Sys); var connFactory = new AkkaPersistenceDataConnectionFactory( new SnapshotConfig( Configuration - .WithFallback(extension.DefaultConfig) + .WithFallback(Linq2DbPersistence.DefaultConfiguration) .GetConfig("akka.persistence.snapshot-store.linq2db"))); using (var conn = connFactory.GetConnection()) diff --git a/src/Akka.Persistence.Sql.Linq2Db.Tests/DbUtils.cs b/src/Akka.Persistence.Sql.Linq2Db.Tests/DbUtils.cs deleted file mode 100644 index 422bdf2f..00000000 --- a/src/Akka.Persistence.Sql.Linq2Db.Tests/DbUtils.cs +++ /dev/null @@ -1,64 +0,0 @@ -using System.Data.SqlClient; - -namespace Akka.Persistence.Sql.Linq2Db.Tests -{ - public static class DbUtils - { - public static string ConnectionString { get; set; } - - public static void Initialize(string connectionString) - { - var connectionBuilder = new SqlConnectionStringBuilder(connectionString); - - //connect to postgres database to create a new database - var databaseName = connectionBuilder.InitialCatalog; - connectionBuilder.InitialCatalog = "master"; - ConnectionString = connectionBuilder.ToString(); - - using var conn = new SqlConnection(ConnectionString); - - conn.Open(); - - using (var cmd = new SqlCommand()) - { - cmd.CommandText = string.Format(@" - IF db_id('{0}') IS NULL - BEGIN - CREATE DATABASE {0} - END - - ", databaseName); - cmd.Connection = conn; - - cmd.ExecuteScalar(); - } - - DropTables(conn, databaseName); - - // set this back to the journal/snapshot database - connectionBuilder.InitialCatalog = databaseName; - ConnectionString = connectionBuilder.ToString(); - } - - public static void Clean() - { - var connectionBuilder = new SqlConnectionStringBuilder(ConnectionString); - var databaseName = connectionBuilder.InitialCatalog; - using var conn = new SqlConnection(ConnectionString); - conn.Open(); - DropTables(conn, databaseName); - } - - private static void DropTables(SqlConnection conn, string databaseName) - { - using var cmd = new SqlCommand(); - cmd.CommandText = $@" - USE {databaseName}; - IF EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = 'dbo' AND TABLE_NAME = 'EventJournal') BEGIN DROP TABLE dbo.EventJournal END; - IF EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = 'dbo' AND TABLE_NAME = 'Metadata') BEGIN DROP TABLE dbo.Metadata END; - IF EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = 'dbo' AND TABLE_NAME = 'SnapshotStore') BEGIN DROP TABLE dbo.SnapshotStore END;"; - cmd.Connection = conn; - cmd.ExecuteNonQuery(); - } - } -} \ No newline at end of file diff --git a/src/Akka.Persistence.Sql.Linq2Db.Tests/Settings/JournalConfigSpec.cs b/src/Akka.Persistence.Sql.Linq2Db.Tests/Settings/JournalConfigSpec.cs index da4bf43a..ee343a0e 100644 --- a/src/Akka.Persistence.Sql.Linq2Db.Tests/Settings/JournalConfigSpec.cs +++ b/src/Akka.Persistence.Sql.Linq2Db.Tests/Settings/JournalConfigSpec.cs @@ -20,7 +20,7 @@ public class JournalConfigSpec public JournalConfigSpec() { - _defaultConfig = Linq2DbPersistence.DefaultConfiguration(); + _defaultConfig = Linq2DbPersistence.DefaultConfiguration; } [Fact(DisplayName = "Default journal HOCON config should contain default values")] diff --git a/src/Akka.Persistence.Sql.Linq2Db.Tests/Settings/ReadJournalConfigSpec.cs b/src/Akka.Persistence.Sql.Linq2Db.Tests/Settings/ReadJournalConfigSpec.cs index f84f5316..19623816 100644 --- a/src/Akka.Persistence.Sql.Linq2Db.Tests/Settings/ReadJournalConfigSpec.cs +++ b/src/Akka.Persistence.Sql.Linq2Db.Tests/Settings/ReadJournalConfigSpec.cs @@ -21,7 +21,7 @@ public class ReadJournalConfigSpec public ReadJournalConfigSpec() { - _defaultConfig = Linq2DbPersistence.DefaultConfiguration(); + _defaultConfig = Linq2DbPersistence.DefaultConfiguration; } [Fact(DisplayName = "Default journal query HOCON config should contain default values")] diff --git a/src/Akka.Persistence.Sql.Linq2Db.Tests/Settings/SnapshotConfigSpec.cs b/src/Akka.Persistence.Sql.Linq2Db.Tests/Settings/SnapshotConfigSpec.cs index d2b50d96..ed8dc2da 100644 --- a/src/Akka.Persistence.Sql.Linq2Db.Tests/Settings/SnapshotConfigSpec.cs +++ b/src/Akka.Persistence.Sql.Linq2Db.Tests/Settings/SnapshotConfigSpec.cs @@ -20,7 +20,7 @@ public class SnapshotConfigSpec public SnapshotConfigSpec() { - _defaultConfig = Linq2DbPersistence.DefaultConfiguration(); + _defaultConfig = Linq2DbPersistence.DefaultConfiguration; } [Fact(DisplayName = "Default snapshot HOCON config should contain default values")] diff --git a/src/Akka.Persistence.Sql.Linq2Db/Config/BaseByteArrayJournalDaoConfig.cs b/src/Akka.Persistence.Sql.Linq2Db/Config/BaseByteArrayJournalDaoConfig.cs index f27d9984..b5890132 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Config/BaseByteArrayJournalDaoConfig.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Config/BaseByteArrayJournalDaoConfig.cs @@ -9,6 +9,7 @@ public BaseByteArrayJournalDaoConfig(Configuration.Config config) BufferSize = config.GetInt("buffer-size", 5000); BatchSize = config.GetInt("batch-size", 100); DbRoundTripBatchSize = config.GetInt("db-round-trip-max-batch-size", 1000); + DbRoundTripTagBatchSize = config.GetInt("db-round-trip-max-tag-batch-size", 1000); PreferParametersOnMultiRowInsert = config.GetBoolean("prefer-parameters-on-multirow-insert", false); ReplayBatchSize = config.GetInt("replay-batch-size", 1000); Parallelism = config.GetInt("parallelism", 2); @@ -36,6 +37,6 @@ public BaseByteArrayJournalDaoConfig(Configuration.Config config) public int BufferSize { get; } public bool SqlCommonCompatibilityMode { get; } - + public int DbRoundTripTagBatchSize { get; } } } \ No newline at end of file diff --git a/src/Akka.Persistence.Sql.Linq2Db/Config/JournalTableColumnNames.cs b/src/Akka.Persistence.Sql.Linq2Db/Config/JournalTableColumnNames.cs index dd51fc89..5c8cc1e2 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Config/JournalTableColumnNames.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Config/JournalTableColumnNames.cs @@ -3,7 +3,7 @@ namespace Akka.Persistence.Sql.Linq2Db.Config { - public class JournalTableColumnNames + public class JournalTableColumnNames : IEquatable { public JournalTableColumnNames(Configuration.Config config) { @@ -28,9 +28,9 @@ public JournalTableColumnNames(Configuration.Config config) public string Identifier { get; } public string Manifest { get; } - private bool Equals(JournalTableColumnNames other) + public bool Equals(JournalTableColumnNames other) { - return + return other is { } && Ordering == other.Ordering && Deleted == other.Deleted && PersistenceId == other.PersistenceId && @@ -44,7 +44,6 @@ private bool Equals(JournalTableColumnNames other) public override bool Equals(object obj) { - if (ReferenceEquals(null, obj)) return false; if (ReferenceEquals(this, obj)) return true; return obj is JournalTableColumnNames j && Equals(j); } diff --git a/src/Akka.Persistence.Sql.Linq2Db/Config/JournalTableConfig.cs b/src/Akka.Persistence.Sql.Linq2Db/Config/JournalTableConfig.cs index c63e12fb..feaaf877 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Config/JournalTableConfig.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Config/JournalTableConfig.cs @@ -3,17 +3,40 @@ namespace Akka.Persistence.Sql.Linq2Db.Config { + public enum TagWriteMode + { + Csv, + TagTable, + } + + public enum TagTableMode + { + OrderingId, + SequentialUuid + } + public class JournalTableConfig : IEquatable { public string SchemaName { get; } + public TagWriteMode TagWriteMode { get; } + public TagTableMode TagTableMode { get; } = TagTableMode.OrderingId; + public bool UseEventManifestColumn { get; } = false; public EventJournalTableConfig EventJournalTable { get; } public MetadataTableConfig MetadataTable { get; } + public TagTableConfig TagTable { get; } public JournalTableConfig(Configuration.Config config) { var mappingPath = config.GetString("table-mapping"); if (string.IsNullOrEmpty(mappingPath)) throw new ConfigurationException("The configuration property akka.persistence.journal.linq2db.table-mapping is null or empty"); + var s = config.GetString("tag-write-mode", "csv").ToLowerInvariant(); + if (!Enum.TryParse(s, true, out TagWriteMode res)) + { + res = TagWriteMode.Csv; + } + TagWriteMode = res; + // backward compatibility var compat = config.GetString("table-compatibility-mode"); if (compat != null) @@ -24,12 +47,13 @@ public JournalTableConfig(Configuration.Config config) throw new ConfigurationException($"The configuration path akka.persistence.journal.linq2db.{mappingPath} does not exist"); if (mappingPath != "default") - mappingConfig.WithFallback(config.GetConfig("default")); + mappingConfig = mappingConfig.WithFallback(Linq2DbPersistence.DefaultJournalMappingConfiguration); SchemaName = mappingConfig.GetString("schema-name"); EventJournalTable = new EventJournalTableConfig(mappingConfig); MetadataTable = new MetadataTableConfig(mappingConfig); + TagTable = new TagTableConfig(mappingConfig); } public bool Equals(JournalTableConfig other) diff --git a/src/Akka.Persistence.Sql.Linq2Db/Config/ReadJournalPluginConfig.cs b/src/Akka.Persistence.Sql.Linq2Db/Config/ReadJournalPluginConfig.cs index 40c72290..69726ffc 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Config/ReadJournalPluginConfig.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Config/ReadJournalPluginConfig.cs @@ -1,4 +1,6 @@ -namespace Akka.Persistence.Sql.Linq2Db.Config +using System; + +namespace Akka.Persistence.Sql.Linq2Db.Config { public class ReadJournalPluginConfig { @@ -6,10 +8,26 @@ public ReadJournalPluginConfig(Configuration.Config config) { TagSeparator = config.GetString("tag-separator", ";"); Dao = config.GetString("dao", "Akka.Persistence.Sql.Linq2Db.Journal.Dao.ByteArrayJournalDao, Akka.Persistence.Sql.Linq2Db"); + + var tagReadStr = config.GetString("tag-read-mode", "csv").ToLowerInvariant(); + if (!Enum.TryParse(tagReadStr,true,out var tgr)) + { + tgr = TagReadMode.Csv; + } + + TagReadMode = tgr; } public string Dao { get; } public string TagSeparator { get; } + public TagReadMode TagReadMode { get; } + public TagTableMode TagTableMode { get; } + } + + public enum TagReadMode + { + Csv = 1, + TagTable = 2, } } \ No newline at end of file diff --git a/src/Akka.Persistence.Sql.Linq2Db/Config/SnapshotTableConfiguration.cs b/src/Akka.Persistence.Sql.Linq2Db/Config/SnapshotTableConfiguration.cs index 314ce759..11b3c712 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Config/SnapshotTableConfiguration.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Config/SnapshotTableConfiguration.cs @@ -18,7 +18,7 @@ public SnapshotTableConfiguration(Configuration.Config config) throw new ConfigurationException($"The configuration path akka.persistence.journal.linq2db.{mappingPath} does not exist"); if (mappingPath != "default") - mappingConfig.WithFallback(config.GetConfig("default")); + mappingConfig.WithFallback(Linq2DbPersistence.DefaultSnapshotMappingConfiguration); SchemaName = mappingConfig.GetString("schema-name"); diff --git a/src/Akka.Persistence.Sql.Linq2Db/Config/TagTableColumnNames.cs b/src/Akka.Persistence.Sql.Linq2Db/Config/TagTableColumnNames.cs new file mode 100644 index 00000000..272e69f4 --- /dev/null +++ b/src/Akka.Persistence.Sql.Linq2Db/Config/TagTableColumnNames.cs @@ -0,0 +1,43 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2009-2023 Lightbend Inc. +// Copyright (C) 2013-2023 .NET Foundation +// +// ----------------------------------------------------------------------- + +using System; + +namespace Akka.Persistence.Sql.Linq2Db.Config; + +public class TagTableColumnNames: IEquatable +{ + public TagTableColumnNames(Configuration.Config config) + { + var cfg = config.GetConfig("columns"); + OrderingId = cfg.GetString("ordering-id", "ordering_id"); + Tag = cfg.GetString("tag-value", "tag"); + WriteUuid = cfg.GetString("writer-uuid", "writer_uuid"); + } + + public string OrderingId { get; } + public string Tag { get; } + public string WriteUuid { get; } + + public bool Equals(TagTableColumnNames other) + { + if (ReferenceEquals(null, other)) return false; + if (ReferenceEquals(this, other)) return true; + return OrderingId == other.OrderingId && Tag == other.Tag && WriteUuid == other.WriteUuid; + } + + public override bool Equals(object obj) + { + if (ReferenceEquals(this, obj)) return true; + return obj is TagTableColumnNames tag && Equals(tag); + } + + public override int GetHashCode() + { + return HashCode.Combine(OrderingId, Tag, WriteUuid); + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Sql.Linq2Db/Config/TagTableConfig.cs b/src/Akka.Persistence.Sql.Linq2Db/Config/TagTableConfig.cs new file mode 100644 index 00000000..90c6f4bf --- /dev/null +++ b/src/Akka.Persistence.Sql.Linq2Db/Config/TagTableConfig.cs @@ -0,0 +1,41 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2009-2023 Lightbend Inc. +// Copyright (C) 2013-2023 .NET Foundation +// +// ----------------------------------------------------------------------- + +using System; + +namespace Akka.Persistence.Sql.Linq2Db.Config; + +public class TagTableConfig : IEquatable +{ + public TagTableConfig(Configuration.Config config) + { + var journalConfig = config.GetConfig("tag"); + Name = journalConfig.GetString("table-name"); + ColumnNames = new TagTableColumnNames(journalConfig); + } + + public string Name { get; } + public TagTableColumnNames ColumnNames { get; } + + public bool Equals(TagTableConfig other) + { + if (ReferenceEquals(null, other)) return false; + if (ReferenceEquals(this, other)) return true; + return Name == other.Name && Equals(ColumnNames, other.ColumnNames); + } + + public override bool Equals(object obj) + { + if (ReferenceEquals(this, obj)) return true; + return obj is TagTableConfig cfg && Equals(cfg); + } + + public override int GetHashCode() + { + return HashCode.Combine(Name, ColumnNames); + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Sql.Linq2Db/Db/AkkaPersistenceDataConnectionFactory.cs b/src/Akka.Persistence.Sql.Linq2Db/Db/AkkaPersistenceDataConnectionFactory.cs index cb102b35..90acac96 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Db/AkkaPersistenceDataConnectionFactory.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Db/AkkaPersistenceDataConnectionFactory.cs @@ -140,6 +140,13 @@ private static void MapJournalRow(IProviderConfig config, .Member(r => r.Timestamp) .HasColumnName(columnNames.Created); + //We can skip writing tags the old way by ignoring the column in mapping. + journalRowBuilder.Member(r => r.TagArr).IsNotColumn(); + if (tableConfig.TagWriteMode == TagWriteMode.TagTable) + { + journalRowBuilder.Member(r => r.Tags).IsNotColumn(); + } + if (config.ProviderName.ToLower().Contains("sqlite")) { journalRowBuilder @@ -154,7 +161,59 @@ private static void MapJournalRow(IProviderConfig config, .Member(r=>r.PersistenceId).IsPrimaryKey() .Member(r=>r.SequenceNumber).IsPrimaryKey(); } - + + if (config.TableConfig.UseEventManifestColumn) + { + journalRowBuilder.Member(r => r.EventManifest) + .IsColumn().HasLength(64); + } + else + { + journalRowBuilder.Member(r => r.EventManifest) + .IsNotColumn(); + } + + if (config.TableConfig.TagWriteMode == TagWriteMode.TagTable) + { + var tagConfig = tableConfig.TagTable; + var tagColumns = tagConfig.ColumnNames; + + var tagTableBuilder = fmb.Entity() + .HasSchemaName(tableConfig.SchemaName) + .HasTableName(tagConfig.Name) + .Member(r => r.TagValue).HasColumnName(tagColumns.Tag) + .IsColumn().IsNullable(false) + .HasLength(64) + .IsPrimaryKey() + .Member(r => r.JournalOrderingId).HasColumnName(tagColumns.OrderingId) + .IsColumn().IsPrimaryKey(); + + if (config.TableConfig.TagTableMode == TagTableMode.SequentialUuid) + { + tagTableBuilder.Member(r => r.JournalOrderingId) + .IsNotColumn(); + tagTableBuilder.Member(r => r.WriteUuid).HasColumnName(tagColumns.WriteUuid) + .IsColumn().IsPrimaryKey(); + } + else + { + tagTableBuilder.Member(r => r.WriteUuid) + .IsNotColumn(); + } + } + + // column compatibility + if (config.TableConfig.TagTableMode == TagTableMode.SequentialUuid) + { + journalRowBuilder.Member(r => r.WriteUuid) + .IsColumn(); + } + else + { + journalRowBuilder.Member(r => r.WriteUuid) + .IsNotColumn(); + } + //Probably overkill, but we only set Metadata Mapping if specified //That we are in delete compatibility mode. if (config.IDaoConfig.SqlCommonCompatibilityMode) diff --git a/src/Akka.Persistence.Sql.Linq2Db/Extension.cs b/src/Akka.Persistence.Sql.Linq2Db/Extension.cs index 4a63f532..87645bf1 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Extension.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Extension.cs @@ -16,34 +16,35 @@ public sealed class Linq2DbPersistence : IExtension public const string JournalConfigPath = "akka.persistence.journal.linq2db"; public const string SnapshotStoreConfigPath = "akka.persistence.snapshot-store.linq2db"; - /// - /// Returns a default configuration for akka persistence Linq2Db-based journals and snapshot stores. - /// - /// - public static Configuration.Config DefaultConfiguration() + public static readonly Configuration.Config DefaultJournalConfiguration; + public static readonly Configuration.Config DefaultSnapshotConfiguration; + public static readonly Configuration.Config DefaultConfiguration; + public static readonly Configuration.Config DefaultJournalMappingConfiguration; + public static readonly Configuration.Config DefaultSnapshotMappingConfiguration; + + public readonly Configuration.Config DefaultJournalConfig = DefaultJournalConfiguration; + public readonly Configuration.Config DefaultSnapshotConfig = DefaultSnapshotConfiguration; + public readonly Configuration.Config DefaultConfig = DefaultConfiguration; + public readonly Configuration.Config DefaultJournalMappingConfig = DefaultJournalMappingConfiguration; + public readonly Configuration.Config DefaultSnapshotMappingConfig = DefaultSnapshotMappingConfiguration; + + static Linq2DbPersistence() { var journalConfig = ConfigurationFactory.FromResource("Akka.Persistence.Sql.Linq2Db.persistence.conf"); var snapshotConfig = ConfigurationFactory.FromResource("Akka.Persistence.Sql.Linq2Db.snapshot.conf"); - - return journalConfig.WithFallback(snapshotConfig); - } - - public readonly Configuration.Config DefaultJournalConfig; - public readonly Configuration.Config DefaultSnapshotConfig; - public readonly Configuration.Config DefaultConfig; - public readonly Configuration.Config DefaultJournalMappingConfig; - public readonly Configuration.Config DefaultSnapshotMappingConfig; + + DefaultConfiguration = journalConfig.WithFallback(snapshotConfig); + + DefaultJournalConfiguration = DefaultConfiguration.GetConfig(JournalConfigPath); + DefaultSnapshotConfiguration = DefaultConfiguration.GetConfig(SnapshotStoreConfigPath); + + DefaultJournalMappingConfiguration = DefaultJournalConfiguration.GetConfig("default"); + DefaultSnapshotMappingConfiguration = DefaultSnapshotConfiguration.GetConfig("default"); + } public Linq2DbPersistence(ExtendedActorSystem system) { - DefaultConfig = DefaultConfiguration(); system.Settings.InjectTopLevelFallback(DefaultConfig); - - DefaultJournalConfig = DefaultConfig.GetConfig(JournalConfigPath); - DefaultSnapshotConfig = DefaultConfig.GetConfig(SnapshotStoreConfigPath); - - DefaultJournalMappingConfig = DefaultJournalConfig.GetConfig("default"); - DefaultSnapshotMappingConfig = DefaultSnapshotConfig.GetConfig("default"); } public static Linq2DbPersistence Get(ActorSystem system) diff --git a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseByteArrayJournalDao.cs b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseByteArrayJournalDao.cs index fb515a6b..1b2c072c 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseByteArrayJournalDao.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseByteArrayJournalDao.cs @@ -15,12 +15,44 @@ using Akka.Streams; using Akka.Streams.Dsl; using LanguageExt; -using LinqToDB; +using LinqToDB;using LinqToDB.Common; using LinqToDB.Data; using static LanguageExt.Prelude; namespace Akka.Persistence.Sql.Linq2Db.Journal.Dao { + public class SequentialUuidGenerator + { + private long _counter = DateTime.UtcNow.Ticks; + + /// + /// Gets a value to be assigned to a property. + /// + /// The change tracking entry of the entity for which the value is being generated. + /// The value to be assigned to a property. + public Guid Next() + { + var guidBytes = Guid.NewGuid().ToByteArray(); + var counterBytes = BitConverter.GetBytes(Interlocked.Increment(ref _counter)); + + if (!BitConverter.IsLittleEndian) + { + System.Array.Reverse(counterBytes); + } + + guidBytes[08] = counterBytes[1]; + guidBytes[09] = counterBytes[0]; + guidBytes[10] = counterBytes[7]; + guidBytes[11] = counterBytes[6]; + guidBytes[12] = counterBytes[5]; + guidBytes[13] = counterBytes[4]; + guidBytes[14] = counterBytes[3]; + guidBytes[15] = counterBytes[2]; + + return new Guid(guidBytes); + } + } + public abstract class BaseByteArrayJournalDao : BaseJournalDaoWithReadMessages, IJournalDaoWithUpdates @@ -35,7 +67,8 @@ public abstract class BaseByteArrayJournalDao : protected readonly ILoggingAdapter Logger; private readonly Flow, long)>, NotUsed> _deserializeFlow; private readonly Flow, NotUsed> _deserializeFlowMapped; - + private readonly SequentialUuidGenerator _uuidGen; + protected BaseByteArrayJournalDao( IAdvancedScheduler scheduler, IMaterializer materializer, @@ -50,6 +83,7 @@ protected BaseByteArrayJournalDao( Serializer = serializer; _deserializeFlow = Serializer.DeserializeFlow(); _deserializeFlowMapped = Serializer.DeserializeFlow().Select(MessageWithBatchMapper()); + _uuidGen = new SequentialUuidGenerator(); //Due to C# rules we have to initialize WriteQueue here //Keeping it here vs init function prevents accidental moving of init @@ -59,13 +93,11 @@ protected BaseByteArrayJournalDao( .BatchWeighted( JournalConfig.DaoConfig.BatchSize, cf => cf.Rows.Count, - r => new WriteQueueSet(new List> { r.Tcs }, r.Rows), - (oldRows, newRows) => - { - oldRows.Tcs.Add(newRows.Tcs); - oldRows.Rows = oldRows.Rows.Concat(newRows.Rows); - return oldRows; - }) + r => new WriteQueueSet(ImmutableList.Create(new[] {r.Tcs}), r.Rows), + (oldRows, newRows) => + new WriteQueueSet( + oldRows.Tcs.Add(newRows.Tcs), + oldRows.Rows.Concat(newRows.Rows))) .SelectAsync( JournalConfig.DaoConfig.Parallelism, async promisesAndRows => @@ -142,56 +174,196 @@ private async Task WriteJournalRows(Seq xs) private async Task InsertSingle(Seq xs) { - //If we are writing a single row, - //we don't need to worry about transactions. - await using var db = ConnectionFactory.GetConnection(); - await db.InsertAsync(xs.Head); + if (JournalConfig.TableConfig.TagWriteMode == TagWriteMode.TagTable && xs.Head.TagArr.Length>0) + { + //Lazy fallback; do the InsertMultiple call here and leave it at that. + await InsertMultiple(xs); + } + else + { + //If we are writing a single row, + //we don't need to worry about transactions. + await using var db = ConnectionFactory.GetConnection(); + await db.InsertAsync(xs.Head); + } } + private async Task InsertWithOrderingAndBulkInsertTags(DataConnection dc, Seq xs) + { + var tagsToInsert = new List(xs.Count); + foreach (var journalRow in xs) + { + var dbid = await dc.InsertWithInt64IdentityAsync(journalRow); + foreach (var s1 in journalRow.TagArr) + { + tagsToInsert.Add(new JournalTagRow{ JournalOrderingId = dbid, TagValue = s1 }); + } + } + await dc.GetTable().BulkCopyAsync(new BulkCopyOptions + { + BulkCopyType = BulkCopyType.MultipleRows, + UseParameters = JournalConfig.DaoConfig.PreferParametersOnMultiRowInsert, + MaxBatchSize = JournalConfig.DaoConfig.DbRoundTripTagBatchSize + }, tagsToInsert); + } + + private async Task BulkInsertNoTagTableTags(DataConnection dc, Seq xs) + { + await dc.GetTable().BulkCopyAsync(new BulkCopyOptions + { + BulkCopyType = xs.Count > JournalConfig.DaoConfig.MaxRowByRowSize ? BulkCopyType.Default : BulkCopyType.MultipleRows, + UseParameters = JournalConfig.DaoConfig.PreferParametersOnMultiRowInsert, + MaxBatchSize = JournalConfig.DaoConfig.DbRoundTripBatchSize + }, xs); + } + private async Task InsertMultiple(Seq xs) + { + if (JournalConfig.TableConfig.TagWriteMode == TagWriteMode.TagTable) + { + if (JournalConfig.TableConfig.TagTableMode == TagTableMode.OrderingId) + { + await HandleTagTableInsert(xs); + } + else + { + await HandleTagTableUuidInsert(xs); + } + } + else + { + await HandleDefaultInsert(xs); + } + } + + private async Task HandleTagTableUuidInsert(Seq xs) + { + var tagWrites = new List(); + foreach (var journalRow in xs) + { + if (journalRow.TagArr?.Length > 0) + { + var uid = NextUuid(); + journalRow.WriteUuid = uid; + foreach (var s1 in journalRow.TagArr) + { + tagWrites.Add(new JournalTagRow { WriteUuid = uid, TagValue = s1 }); + } + } + } + + await using var ctx = ConnectionFactory.GetConnection(); + await using var tx = await ctx.BeginTransactionAsync(); + try + { + await ctx.BulkCopyAsync(new BulkCopyOptions + { + TableName = JournalConfig.TableConfig.EventJournalTable.Name, + MaxBatchSize = JournalConfig.DaoConfig.DbRoundTripBatchSize + },xs); + + if (tagWrites.Count > 0) + { + await ctx.BulkCopyAsync(new BulkCopyOptions + { + TableName = JournalConfig.TableConfig.TagTable.Name, + MaxBatchSize = JournalConfig.DaoConfig.DbRoundTripTagBatchSize, + UseParameters = JournalConfig.DaoConfig.PreferParametersOnMultiRowInsert + }, tagWrites); + } + await ctx.CommitTransactionAsync(); + } + catch (Exception e1) + { + try + { + await ctx.RollbackTransactionAsync(); + } + catch (Exception e2) + { + throw new AggregateException(e2, e1); + } + + throw; + } + } + + private Guid NextUuid() + { + return _uuidGen.Next(); + } + + private async Task HandleDefaultInsert(Seq xs) { await using var db = ConnectionFactory.GetConnection(); - + await using var tx = await db.BeginTransactionAsync(IsolationLevel.ReadCommitted); try { - await db.BeginTransactionAsync(IsolationLevel.ReadCommitted); - await db.GetTable() - .BulkCopyAsync( - new BulkCopyOptions - { - BulkCopyType = xs.Count > JournalConfig.DaoConfig.MaxRowByRowSize - ? BulkCopyType.Default - : BulkCopyType.MultipleRows, - UseParameters = JournalConfig.DaoConfig.PreferParametersOnMultiRowInsert, - MaxBatchSize = JournalConfig.DaoConfig.DbRoundTripBatchSize - }, xs); + await BulkInsertNoTagTableTags(db, xs); await db.CommitTransactionAsync(); } - catch (Exception e) + catch (Exception e1) { try { await db.RollbackTransactionAsync(); } - catch (Exception exception) + catch (Exception e2) { - throw new AggregateException(exception, e); + throw new AggregateException(e2, e1); } + + throw; + } + } + private async Task HandleTagTableInsert(Seq xs) + { + await using var db = ConnectionFactory.GetConnection(); + await using var tx = await db.BeginTransactionAsync(IsolationLevel.ReadCommitted); + try + { + await ConsumeSequenceForTagInsert(xs, db); + await db.CommitTransactionAsync(); + } + catch (Exception e1) + { + try + { + await db.RollbackTransactionAsync(); + } + catch (Exception e2) + { + throw new AggregateException(e2, e1); + } + throw; } } + private async Task ConsumeSequenceForTagInsert(Seq xs, DataConnection db) + { + var tail = xs; + while (tail.Count > 0) + { + (var noTags, tail) = tail.Span(r => r.TagArr.Length == 0); + if (noTags.Count > 0) + { + await BulkInsertNoTagTableTags(db, noTags); + } + + (var hasTags, tail) = tail.Span(r => r.TagArr.Length > 0); + if (hasTags.Count > 0) + { + await InsertWithOrderingAndBulkInsertTags(db, hasTags); + } + } + } + //By using a custom flatten here, we avoid an Enumerable/LINQ allocation //And are able to have a little more control over default capacity of array. - private static List FlattenListOfListsToList(List>> source) + private static List FlattenListOfListsToList(List> source) { - //List ResultSet( - // Akka.Util.Try> item) - //{ - // return item.Success.GetOrElse(new List(0)); - //} - var rows = new List(source.Count > 4 ? source.Count:4); foreach (var t in source) { @@ -200,7 +372,6 @@ private static List FlattenListOfListsToList(List> AsyncWriteMessages( { var serializedTries = Serializer.Serialize(messages, timeStamp); - //Fold our List of Lists into a single sequence + // Fold our List of Lists into a single sequence var rows = Seq(FlattenListOfListsToList(serializedTries)); - //Wait for the write to go through. If Task fails, write will be captured - //As WriteMessagesFailure. + // Wait for the write to go through. If Task fails, write will be captured as WriteMessagesFailure. await QueueWriteJournalRows(rows); - //If we get here, we build an ImmutableList containing our rejections. - //These will be captured as WriteMessagesRejected + // If we get here, we build an ImmutableList containing our rejections. + // These will be captured as WriteMessagesRejected return BuildWriteRejections(serializedTries); } - protected static ImmutableList BuildWriteRejections( - List>> serializedTries) + protected static ImmutableList BuildWriteRejections(List> serializedTries) { var builderEx = new Exception[serializedTries.Count]; for (var i = 0; i < serializedTries.Count; i++) diff --git a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseJournalDaoWithReadMessages.cs b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseJournalDaoWithReadMessages.cs index 3fc7c4c3..7b7dffbc 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseJournalDaoWithReadMessages.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseJournalDaoWithReadMessages.cs @@ -47,15 +47,21 @@ protected BaseJournalDaoWithReadMessages( (Math.Max(1, fromSequenceNr), FlowControlEnum.Continue), async opt => { - async Task>)>> RetrieveNextBatch() + async Task>> BatchFromDb(string s, long l, int i, long fromSeqNo) { Seq> msg; - await using (var conn = ConnectionFactory.GetConnection()) - { - msg = await Messages(conn, persistenceId, opt.seqNr, toSequenceNr, batchSize) - .RunWith(ExtSeq.Seq>(), Materializer); - } + await using var conn = ConnectionFactory.GetConnection(); + msg = await Messages(conn, s, fromSeqNo, l, i) + .RunWith(ExtSeq.Seq>(), Materializer); + + return msg; + } + async Task>)>> RetrieveNextBatch(long fromSeq) + { + Seq> msg; + msg = await BatchFromDb(persistenceId, toSequenceNr, batchSize, fromSeq); + var hasMoreEvents = msg.Count == batchSize; //var lastMsg = msg.IsEmpty.LastOrDefault(); var lastSeq = Util.Option.None; @@ -88,8 +94,7 @@ protected BaseJournalDaoWithReadMessages( nextFrom = lastSeq.Value + 1; } - return new Util.Option<((long, FlowControlEnum), Seq>)>(( - (nextFrom, nextControl), msg)); + return new Util.Option<((long, FlowControlEnum), Seq>)>(((nextFrom, nextControl), msg)); } return opt.flowControl switch @@ -97,10 +102,10 @@ protected BaseJournalDaoWithReadMessages( FlowControlEnum.Stop => Util.Option<((long, FlowControlEnum), Seq>)>.None, - FlowControlEnum.Continue => await RetrieveNextBatch(), + FlowControlEnum.Continue => await RetrieveNextBatch(opt.seqNr), FlowControlEnum.ContinueDelayed when refreshInterval.HasValue => - await FutureTimeoutSupport.After(refreshInterval.Value.Item1, refreshInterval.Value.Item2, RetrieveNextBatch), + await FutureTimeoutSupport.After(refreshInterval.Value.Item1, refreshInterval.Value.Item2, () => RetrieveNextBatch(opt.seqNr)), _ => InvalidFlowThrowHelper(opt) }; diff --git a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/ByteArrayJournalSerializer.cs b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/ByteArrayJournalSerializer.cs index 2d3fdf14..f38a6957 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/ByteArrayJournalSerializer.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/ByteArrayJournalSerializer.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Immutable; +using System.Linq; using Akka.Actor; using Akka.Persistence.Sql.Linq2Db.Config; using Akka.Persistence.Sql.Linq2Db.Journal.Types; @@ -18,6 +19,7 @@ public sealed class ByteArrayJournalSerializer : FlowPersistentReprSerializer _journalConfig; private readonly string[] _separatorArray; + private readonly TagWriteMode _tagWriteMode; public ByteArrayJournalSerializer(IProviderConfig journalConfig, Akka.Serialization.Serialization serializer, string separator) { @@ -25,6 +27,7 @@ public ByteArrayJournalSerializer(IProviderConfig journalCon _serializer = serializer; _separator = separator; _separatorArray = new[] {_separator}; + _tagWriteMode = journalConfig.TableConfig.TagWriteMode; } /// @@ -37,6 +40,27 @@ private static string StringSep(IImmutableSet tags, string separator) { return tags.Count == 0 ? "" : $"{separator}{string.Join(separator, tags)}{separator}"; } + + private JournalRow CreateJournalRow(IImmutableSet tags, IPersistentRepresentation representation, long ts) + { + return _tagWriteMode switch + { + TagWriteMode.Csv => new JournalRow + { + Tags = StringSep(tags, _separator), + Timestamp = representation.Timestamp == 0 ? ts : representation.Timestamp + }, + + TagWriteMode.TagTable => new JournalRow + { + Tags = "", + TagArr = tags.ToArray(), + Timestamp = representation.Timestamp == 0 ? ts : representation.Timestamp + }, + + _ => throw new Exception($"Invalid Tag Write Mode! Was: {_tagWriteMode}") + }; + } protected override Try Serialize( IPersistentRepresentation persistentRepr, @@ -52,36 +76,31 @@ protected override Try Serialize( state: ( persistentRepr, _serializer.FindSerializerForType(persistentRepr.Payload.GetType(), _journalConfig.DefaultSerializer), - StringSep(tTags,_separator), - timeStamp + CreateJournalRow(tTags,persistentRepr,timeStamp) ), action: state => { - var (representation, serializer, tags, ts) = state; - var thisManifest = ""; + var (representation, serializer, row) = state; if (serializer is SerializerWithStringManifest withStringManifest) { - thisManifest = withStringManifest.Manifest(representation.Payload); + row.Manifest = withStringManifest.Manifest(representation.Payload); } - else + else if (serializer.IncludeManifest) { - if (serializer.IncludeManifest) - { - thisManifest = representation.Payload.GetType().TypeQualifiedName(); - } + row.Manifest = representation.Payload.GetType().TypeQualifiedName(); } - return new Try(new JournalRow + else { - Message = serializer.ToBinary(representation.Payload), - Manifest = thisManifest, - PersistenceId = representation.PersistenceId, - Tags = tags, - Identifier = serializer.Identifier, - SequenceNumber = representation.SequenceNr, - Timestamp = representation.Timestamp == 0 - ? ts - : representation.Timestamp - }); + row.Manifest = ""; + } + + row.Message = serializer.ToBinary(representation.Payload); + row.PersistenceId = representation.PersistenceId; + row.Identifier = serializer.Identifier; + row.SequenceNumber = representation.SequenceNr; + row.EventManifest = representation.Manifest; + + return new Try(row); }); } catch (Exception e) @@ -109,14 +128,14 @@ protected override Try Serialize( action: state => state.Item1.FromBinary(state.message, state.type)), sequenceNr: t.SequenceNumber, persistenceId: t.PersistenceId, - manifest: t.Manifest, + manifest: t.EventManifest ?? t.Manifest, isDeleted: t.Deleted, sender: ActorRefs.NoSender, writerGuid: null, timestamp: t.Timestamp), t.Tags? .Split(_separatorArray, StringSplitOptions.RemoveEmptyEntries) - .ToImmutableHashSet() ?? ImmutableHashSet.Empty, + .ToImmutableHashSet() ?? t.TagArr?.ToImmutableHashSet()?? ImmutableHashSet.Empty, t.Ordering)); } @@ -126,14 +145,14 @@ protected override Try Serialize( payload: _serializer.Deserialize(t.Message, identifierMaybe.Value, t.Manifest), sequenceNr: t.SequenceNumber, persistenceId: t.PersistenceId, - manifest: t.Manifest, + manifest: t.EventManifest ?? t.Manifest, isDeleted: t.Deleted, sender: ActorRefs.NoSender, writerGuid: null, timestamp: t.Timestamp), t.Tags? .Split(_separatorArray, StringSplitOptions.RemoveEmptyEntries) - .ToImmutableHashSet() ?? ImmutableHashSet.Empty, + .ToImmutableHashSet() ?? t.TagArr?.ToImmutableHashSet()?? ImmutableHashSet.Empty, t.Ordering)); } catch (Exception e) diff --git a/src/Akka.Persistence.Sql.Linq2Db/Journal/Linq2DbWriteJournal.cs b/src/Akka.Persistence.Sql.Linq2Db/Journal/Linq2DbWriteJournal.cs index d32f1340..5bf84ec0 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Journal/Linq2DbWriteJournal.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Journal/Linq2DbWriteJournal.cs @@ -44,11 +44,8 @@ public static DateTime FromUnixEpochMillis(in long unixEpochMillis) public class Linq2DbWriteJournal : AsyncWriteJournal { - [Obsolete(message: "Use Linq2DbPersistence.Get(ActorSystem).DefaultConfig instead")] - public static readonly Configuration.Config DefaultConfiguration = - ConfigurationFactory.FromResource("Akka.Persistence.Sql.Linq2Db.persistence.conf"); - - public readonly Linq2DbPersistence Extension = Linq2DbPersistence.Get(Context.System); + [Obsolete(message: "Use Linq2DbPersistence.DefaultConfiguration or Linq2DbPersistence.Get(ActorSystem).DefaultConfig instead")] + public static readonly Configuration.Config DefaultConfiguration = Linq2DbPersistence.DefaultConfiguration; private readonly ActorMaterializer _mat; private readonly JournalConfig _journalConfig; @@ -61,7 +58,7 @@ public Linq2DbWriteJournal(Configuration.Config journalConfig) try { - var config = journalConfig.WithFallback(Extension.DefaultJournalConfig); + var config = journalConfig.WithFallback(Linq2DbPersistence.DefaultJournalConfiguration); _journalConfig = new JournalConfig(config); _mat = Materializer.CreateSystemMaterializer( context: (ExtendedActorSystem)Context.System, @@ -79,11 +76,11 @@ public Linq2DbWriteJournal(Configuration.Config journalConfig) connection: new AkkaPersistenceDataConnectionFactory(_journalConfig), journalConfig: _journalConfig, serializer: Context.System.Serialization, - logger: Logging.GetLogger(Context.System, typeof(ByteArrayJournalDao))); + logger: Context.GetLogger()); } catch (Exception e) { - _log.Error(e, "Error Initializing Journal!"); + Context.GetLogger().Error(e, "Error Initializing Journal!"); throw; } @@ -95,7 +92,7 @@ public Linq2DbWriteJournal(Configuration.Config journalConfig) } catch (Exception e) { - _log.Warning(e, "Unable to Initialize Persistence Journal Table!"); + Context.GetLogger().Warning(e, "Unable to Initialize Persistence Journal Table!"); } } } diff --git a/src/Akka.Persistence.Sql.Linq2Db/Journal/Types/JournalRow.cs b/src/Akka.Persistence.Sql.Linq2Db/Journal/Types/JournalRow.cs index ed8f4e45..69508efd 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Journal/Types/JournalRow.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Journal/Types/JournalRow.cs @@ -1,8 +1,16 @@ -using LinqToDB; -using LinqToDB.Mapping; +using System; namespace Akka.Persistence.Sql.Linq2Db.Journal.Types { + public sealed class JournalTagRow + { + public long JournalOrderingId { get; set; } + + public string TagValue { get; set; } + + public Guid WriteUuid { get; set; } + } + public sealed class JournalRow { public long Ordering { get; set; } @@ -22,5 +30,11 @@ public sealed class JournalRow public string Manifest { get; set; } public int? Identifier { get; set; } + + public string[] TagArr { get; set; } + + public Guid? WriteUuid { get; set; } + + public string EventManifest { get; set; } } } \ No newline at end of file diff --git a/src/Akka.Persistence.Sql.Linq2Db/Journal/Types/WriteQueueSet.cs b/src/Akka.Persistence.Sql.Linq2Db/Journal/Types/WriteQueueSet.cs index b970a39f..325bc51a 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Journal/Types/WriteQueueSet.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Journal/Types/WriteQueueSet.cs @@ -1,4 +1,4 @@ -using System.Collections.Generic; +using System.Collections.Immutable; using System.Threading.Tasks; using LanguageExt; @@ -6,14 +6,14 @@ namespace Akka.Persistence.Sql.Linq2Db.Journal.Types { public sealed class WriteQueueSet { - public WriteQueueSet(List> tcs, Seq rows) + public WriteQueueSet(ImmutableList> tcs, Seq rows) { Tcs = tcs; Rows = rows; } - public Seq Rows { get; set; } + public Seq Rows { get; } - public List> Tcs { get; } + public ImmutableList> Tcs { get; } } } \ No newline at end of file diff --git a/src/Akka.Persistence.Sql.Linq2Db/Query/Dao/BaseByteReadArrayJournalDAO.cs b/src/Akka.Persistence.Sql.Linq2Db/Query/Dao/BaseByteReadArrayJournalDAO.cs index baa851ac..d9411df1 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Query/Dao/BaseByteReadArrayJournalDAO.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Query/Dao/BaseByteReadArrayJournalDAO.cs @@ -1,6 +1,9 @@ using System; +using System.Collections.Generic; using System.Collections.Immutable; using System.Linq; +using System.Linq.Expressions; +using System.Runtime.CompilerServices; using System.Threading.Tasks; using Akka.Actor; using Akka.Persistence.Sql.Linq2Db.Config; @@ -14,6 +17,7 @@ using Akka.Util; using LinqToDB; using LinqToDB.Data; +using LinqToDB.Tools; namespace Akka.Persistence.Sql.Linq2Db.Query.Dao { @@ -55,6 +59,7 @@ public Source AllPersistenceIdsSource(long max) }); } + [MethodImpl(MethodImplOptions.AggressiveInlining)] private static int MaxTake(long max) { return max > int.MaxValue ? int.MaxValue : (int)max; @@ -72,18 +77,77 @@ private static int MaxTake(long max) async input=> { await using var conn = input._connectionFactory.GetConnection(); - - return await conn.GetTable() - .OrderBy(r => r.Ordering) + var events = await conn.GetTable() .Where(r => r.Ordering > input.offset && r.Ordering <= input.maxOffset && r.Deleted == false) + .OrderBy(r => r.Ordering) .Take(input.maxTake).ToListAsync(); + return await AddTagDataIfNeeded(events, conn); } ).Via(_deserializeFlow); } + public async Task> AddTagDataIfNeeded(List toAdd, DataConnection context) + { + if (_readJournalConfig.PluginConfig.TagReadMode == TagReadMode.TagTable) + { + await AddTagDataFromTagTable(toAdd, context); + } + return toAdd; + } + + private async Task AddTagDataFromTagTable(List toAdd, DataConnection context) + { + var pred = TagCheckPredicate(toAdd); + var tagRows = pred.HasValue + ? await context.GetTable().Where(pred.Value).ToListAsync() + : new List(); + if (_readJournalConfig.TableConfig.TagTableMode == TagTableMode.OrderingId) + { + foreach (var journalRow in toAdd) + { + journalRow.TagArr = tagRows + .Where(r => r.JournalOrderingId == journalRow.Ordering) + .Select(r => r.TagValue).ToArray(); + } + } + else + { + foreach (var journalRow in toAdd) + { + journalRow.TagArr = tagRows + .Where(r => r.WriteUuid == journalRow.WriteUuid) + .Select(r => r.TagValue).ToArray(); + } + } + } + + public Option>> TagCheckPredicate(List toAdd) + { + if (_readJournalConfig.PluginConfig.TagTableMode == TagTableMode.SequentialUuid) + { + //Check whether we have anything to query for two reasons: + //1: Linq2Db may choke on an empty 'in' set. + //2: Don't wanna make a useless round trip to the DB, + // if we know nothing is tagged. + var set = toAdd + .Where(r => r.WriteUuid.HasValue) + .Select(r => r.WriteUuid.Value).ToList(); + return set.Count == 0 + ? Option>>.None + : new Option>>(r => r.WriteUuid.In(set)); + } + + //We can just check the count here. + //Alas, we won't know if there are tags + //Until we actually query on this one. + return toAdd.Count == 0 + ? Option>>.None + : new Option>>( r => r.JournalOrderingId.In(toAdd.Select(r => r.Ordering))); + } + public Source, long)>, NotUsed> EventsByTag( string tag, long offset, @@ -92,23 +156,138 @@ private static int MaxTake(long max) { var separator = _readJournalConfig.PluginConfig.TagSeparator; var maxTake = MaxTake(max); - + switch (_readJournalConfig.PluginConfig.TagReadMode) + { + case TagReadMode.Csv: + return AsyncSource.FromEnumerable( + new { separator, tag, offset, maxOffset, maxTake, ConnectionFactory }, + async input => + { + var tagValue = $"{separator}{input.tag}{separator}"; + await using var conn = input.ConnectionFactory.GetConnection(); + + return await conn.GetTable() + .Where(r => + r.Tags.Contains(tagValue) + && !r.Deleted + && r.Ordering > input.offset + && r.Ordering <= input.maxOffset) + .OrderBy(r => r.Ordering) + .Take(input.maxTake).ToListAsync(); + }) + .Via(_deserializeFlow); + case TagReadMode.TagTable: + return EventByTagTableOnly(tag, offset, maxOffset, separator, maxTake); + default: + throw new ArgumentOutOfRangeException(); + } + } + + private Source, long)>, NotUsed> EventByTagTableOnly( + string tag, + long offset, + long maxOffset, + string separator, + int maxTake) + { + return AsyncSource + .FromEnumerable( + new { ConnectionFactory, separator, tag, offset, maxOffset, maxTake }, + async input => + { + //TODO: Optimize Flow + await using var conn = input.ConnectionFactory.GetConnection(); + //First, Get eligible rows. + var mainRows = await conn.GetTable() + .LeftJoin( + conn.GetTable(), + EventsByTagOnlyJoinPredicate, + (jr, jtr) => new { jr, jtr }) + .Where(r => r.jtr.TagValue == input.tag) + .Select(r => r.jr) + .Where(r => r.Ordering > input.offset && r.Ordering <= input.maxOffset && !r.Deleted) + .Take(input.maxTake).ToListAsync(); + await AddTagDataFromTagTable(mainRows, conn); + return mainRows; + }) + //We still PerfectlyMatchTag here + //Because DB Collation :) + .Via(PerfectlyMatchTag(tag, separator)) + .Via(_deserializeFlow); + } + + private Expression> EventsByTagOnlyJoinPredicate + { + get + { + if (_readJournalConfig.TableConfig.TagTableMode == TagTableMode.OrderingId) + return (jr, jtr) => jr.Ordering == jtr.JournalOrderingId; + + return (jr, jtr) => jr.WriteUuid == jtr.WriteUuid; + } + } + + private Source, long)>, NotUsed> EventByTagMigration( + string tag, + long offset, + long maxOffset, + string separator, + int maxTake) + { return AsyncSource.FromEnumerable( - new{ separator, tag, offset, maxOffset, maxTake, _connectionFactory = ConnectionFactory}, - async input => - { - var tagValue = $"{separator}{input.tag}{separator}"; - await using var conn = input._connectionFactory.GetConnection(); - - return await conn.GetTable() - .Where(r => - r.Tags.Contains(tagValue) && - r.Deleted == false) - .OrderBy(r => r.Ordering) - .Where(r => r.Ordering > input.offset && r.Ordering <= input.maxOffset) - .Take(input.maxTake).ToListAsync(); - }) + new { ConnectionFactory, separator, tag, offset, maxOffset, maxTake }, + async input => + { + // NOTE: This flow is probably not performant, + // It is meant to allow for safe migration + // And is not necessarily intended for long term use + await using var conn = input.ConnectionFactory.GetConnection(); + + // First, find the rows. + // We use IN here instead of left join because it's safer from a + // 'avoid duplicate rows tripping things up later' standpoint. + var mainRows = await conn.GetTable() + .Where(EventsByTagMigrationPredicate(conn, input.tag, separator)) + .OrderBy(r => r.Ordering) + .Where(r => r.Ordering > input.offset && r.Ordering <= input.maxOffset && r.Deleted == false) + .Take(input.maxTake).ToListAsync(); + + await AddTagDataFromTagTable(mainRows, conn); + return mainRows; + }) .Via(_deserializeFlow); + } + + private Expression> EventsByTagMigrationPredicate(DataConnection conn, string tag, string separator) + { + var tagValue = $"{separator}{tag}{separator}"; + if (_readJournalConfig.TableConfig.TagTableMode == TagTableMode.OrderingId) + { + return r => + r.Tags.Contains(tagValue) || + r.Ordering.In( + conn.GetTable() + .Where(j => j.TagValue == tag) + .Select(j => j.JournalOrderingId)); + } + return r => + r.Tags.Contains(tagValue) || + r.WriteUuid.Value.In( + conn.GetTable() + .Where(j => j.TagValue == tag) + .Select(j => j.WriteUuid)); + } + + private Flow PerfectlyMatchTag( + string tag, + string separator) + { + //Do the tagArr check first here + //Since the logic is simpler. + return Flow.Create() + .Where(r => r.TagArr?.Contains(tag) ?? (r.Tags ?? "") + .Split( new[] { separator }, StringSplitOptions.RemoveEmptyEntries ) + .Any(t => t.Contains(tag))); } public override Source, NotUsed> Messages( @@ -153,6 +332,7 @@ public Source JournalSequence(long offset, long limit) { await using var conn = input._connectionFactory.GetConnection(); + //persistence-jdbc does not filter deleted here. return await conn.GetTable() .Where(r => r.Ordering > input.offset) .Select(r => r.Ordering) @@ -164,6 +344,7 @@ public async Task MaxJournalSequenceAsync() { await using var db = ConnectionFactory.GetConnection(); + //persistence-jdbc does not filter deleted here. return await db.GetTable() .Select(r => r.Ordering) .OrderByDescending(r => r) diff --git a/src/Akka.Persistence.Sql.Linq2Db/Serialization/PersistentReprSerializer.cs b/src/Akka.Persistence.Sql.Linq2Db/Serialization/PersistentReprSerializer.cs index fe1b952a..b316921e 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Serialization/PersistentReprSerializer.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Serialization/PersistentReprSerializer.cs @@ -8,7 +8,7 @@ namespace Akka.Persistence.Sql.Linq2Db.Serialization { public abstract class PersistentReprSerializer { - public List>> Serialize(IEnumerable messages, long timeStamp = 0) + public List> Serialize(IEnumerable messages, long timeStamp = 0) { return messages.Select(aw => { @@ -20,7 +20,7 @@ public abstract class PersistentReprSerializer //We will only enumerate if we have more than one element. if (aw.Payload is not IImmutableList payloads) { - return new Util.Try>( + return new Util.Try( new ArgumentNullException( $"{aw.PersistenceId} received empty payload for sequenceNr range " + $"{aw.LowestSequenceNr} - {aw.HighestSequenceNr}")); @@ -28,40 +28,65 @@ public abstract class PersistentReprSerializer //Preallocate our list; In the common case //This saves a tiny bit of garbage - var retList = new List(payloads.Count); - if (payloads.Count == 1) - { - // If there's only one payload - // Don't allocate the enumerable. - var ser = Serialize(payloads[0], timeStamp); - var opt = ser.Success; - if (opt.HasValue) - { - retList.Add(opt.Value); - return new Util.Try>(retList); - } - - return new Util.Try>(ser.Failure.Value); - } - - foreach (var p in payloads) + var retList = new T[payloads.Count]; + for (var idx = 0; idx < payloads.Count; idx++) { + var p = payloads[idx]; var ser = Serialize(p, timeStamp); var opt = ser.Success; if (opt.HasValue) { - retList.Add(opt.Value); + retList[idx] = opt.Value; } else { - return new Util.Try>(ser.Failure.Value); + return new Util.Try(ser.Failure.Value); } } - return new Util.Try>(retList); + return new Util.Try(retList); }).ToList(); } + private List> HandleSerializeList(long timeStamp, AtomicWrite[] msgArr) + { + var fullSet = new List>(msgArr.Length); + for (var i = 0; i < msgArr.Length; i++) + { + if (msgArr[i].Payload is not IImmutableList payloads) + { + fullSet.Add(new Util.Try(new ArgumentNullException( + $"{msgArr[i].PersistenceId} received empty payload for sequenceNr range " + + $"{msgArr[i].LowestSequenceNr} - {msgArr[i].HighestSequenceNr}"))); + } + else + { + fullSet.Add(SerializerItem(timeStamp, payloads)); + } + } + + return fullSet; + } + + private Util.Try SerializerItem(long timeStamp, IImmutableList payloads) + { + var retList = new T[payloads.Count]; + for (var j = 0; j < payloads.Count; j++) + { + var ser = Serialize(payloads[j], timeStamp); + var opt = ser.Success; + if (opt.HasValue) + { + retList[j] = opt.Value; + } + else + { + return new Util.Try(ser.Failure.Value); + } + } + + return new Util.Try(retList); + } public Util.Try Serialize(IPersistentRepresentation persistentRepr, long timeStamp = 0) { diff --git a/src/Akka.Persistence.Sql.Linq2Db/Snapshot/Linq2DbSnapshotStore.cs b/src/Akka.Persistence.Sql.Linq2Db/Snapshot/Linq2DbSnapshotStore.cs index 5416b207..4904a86d 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/Snapshot/Linq2DbSnapshotStore.cs +++ b/src/Akka.Persistence.Sql.Linq2Db/Snapshot/Linq2DbSnapshotStore.cs @@ -19,15 +19,13 @@ public class Linq2DbSnapshotStore : SnapshotStore public static readonly Configuration.Config DefaultConfiguration = ConfigurationFactory.FromResource("Akka.Persistence.Sql.Linq2Db.snapshot.conf"); - public readonly Linq2DbPersistence Extension = Linq2DbPersistence.Get(Context.System); - private readonly SnapshotConfig _snapshotConfig; private readonly ByteArraySnapshotDao _dao; public Linq2DbSnapshotStore(Configuration.Config snapshotConfig) { - var config = snapshotConfig.WithFallback(Extension.DefaultSnapshotConfig); + var config = snapshotConfig.WithFallback(Linq2DbPersistence.DefaultSnapshotConfiguration); _snapshotConfig = new SnapshotConfig(config); diff --git a/src/Akka.Persistence.Sql.Linq2Db/persistence.conf b/src/Akka.Persistence.Sql.Linq2Db/persistence.conf index cd2ef336..55a9be1e 100644 --- a/src/Akka.Persistence.Sql.Linq2Db/persistence.conf +++ b/src/Akka.Persistence.Sql.Linq2Db/persistence.conf @@ -81,6 +81,8 @@ # You may wish to provide a dedicated dispatcher instead materializer-dispatcher = "akka.actor.default-dispatcher" + tag-write-mode = csv + # should corresponding journal table be initialized automatically #if delete-compatibility-mode is true, both tables are created #if delete-compatibility-mode is false, only journal table will be created. @@ -129,6 +131,15 @@ sequence-number = sequence_number } } + + tag { + table-name = "tags" + columns { + ordering-id = ordering_id + tag-value = tag + writer-uuid = writer_uuid + } + } } # Akka.Persistence.SqlServer compatibility table name and column name mapping @@ -215,7 +226,7 @@ } } - postgres = ${akka.persistence.journal.linq2db.postgresql} # backward compat naming + # postgres = ${akka.persistence.journal.linq2db.postgresql} # backward compat naming # Akka.Persistence.MySql compatibility table name and column name mapping mysql { @@ -258,6 +269,8 @@ refresh-interval = 1s # interval for refreshing connection-string = "" # Connection String is Required! + + tag-read-mode = csv journal-sequence-retrieval{ batch-size = 10000 diff --git a/src/common.props b/src/common.props index abf9ef83..e1601298 100644 --- a/src/common.props +++ b/src/common.props @@ -20,14 +20,14 @@ 3.3.0 1.4.46 - 0.5.1 - 1.4.45 + 0.5.2-beta1 + 1.4.46 1.4.35 1.4.35 6.0.7 - 4.8.5 + 4.8.4 3.125.12