-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add SQL query isolation level #6654
Add SQL query isolation level #6654
Conversation
public static class DbConnectionExtensions | ||
{ | ||
[MethodImpl(MethodImplOptions.AggressiveInlining)] | ||
public static async Task ExecuteInTransaction( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These extension methods wraps a query delegate inside a controlled transaction
} | ||
|
||
[MethodImpl(MethodImplOptions.AggressiveInlining)] | ||
public static async Task<T> ExecuteInTransaction<T>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These extension methods wraps a query delegate inside a controlled transaction.
This is the wrapper version that returns a value.
=> config.GetString(key).ToIsolationLevel(); | ||
|
||
[MethodImpl(MethodImplOptions.AggressiveInlining)] | ||
public static IsolationLevel ToIsolationLevel(this string level) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Convenience method to convert HOCON/string to IsolationLevel
WriteIsolationLevel = namingConventions.WriteIsolationLevel; | ||
|
||
// backward compatibility | ||
var level = config.GetString("isolation-level"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Backward compatibility for older codes
@@ -862,25 +940,24 @@ private void TryProcess() | |||
{ | |||
_remainingOperations--; | |||
|
|||
var chunk = DequeueChunk(_remainingOperations); | |||
var (chunk, isWrite) = DequeueChunk(_remainingOperations); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isWrite
is a flag to signal if the current batch chunk is a read or a write chunk.
DequeueChunk
automatically separate the read and write requests in the queue, we need to pull the read and write information of the current batch chunk from there.
.PipeTo(Self, failure: ex => new ChunkExecutionFailure(ex, chunk.Requests, chunk.ChunkId)); | ||
} | ||
} | ||
|
||
private async Task<BatchComplete> ExecuteChunk(RequestChunk chunk, IActorContext context) | ||
private async Task<BatchComplete> ExecuteChunk(RequestChunk chunk, IActorContext context, bool isWriteOperation) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to tell ExecuteBatch
the read/write context of the chunk so we can apply the appropriate read/write isolation level
@@ -1331,7 +1406,7 @@ protected void AddParameter(TCommand command, string paramName, DbType dbType, o | |||
/// Select the buffer that has the smallest id on its first item, retrieve a maximum Setup.MaxBatchSize | |||
/// items from it, and return it as a chunk that needs to be batched | |||
/// </summary> | |||
private RequestChunk DequeueChunk(int chunkId) | |||
private (RequestChunk chunk, bool isWrite) DequeueChunk(int chunkId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isWrite
is a flag to signal if the current batch chunk is a read or a write chunk.
DequeueChunk
automatically separate the read and write requests in the queue, we need to pull the read and write information of the current batch chunk from there.
@@ -323,19 +331,84 @@ protected BatchingSqlJournalSetup(Config config, QueryConfiguration namingConven | |||
/// <param name="replayFilterSettings">The settings used when replaying events from database back to the persistent actors.</param> | |||
/// <param name="namingConventions">The naming conventions used by the database to construct valid SQL statements.</param> | |||
/// <param name="defaultSerializer">The serializer used when no specific type matching can be found.</param> | |||
protected BatchingSqlJournalSetup(string connectionString, int maxConcurrentOperations, int maxBatchSize, int maxBufferSize, bool autoInitialize, TimeSpan connectionTimeout, IsolationLevel isolationLevel, CircuitBreakerSettings circuitBreakerSettings, ReplayFilterSettings replayFilterSettings, QueryConfiguration namingConventions, string defaultSerializer) | |||
[Obsolete("Use constructor with separate read and write isolation level instead. (since v1.5.2)")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Marking old constructors as obsolete
@@ -207,6 +217,7 @@ public class QueryConfiguration | |||
/// <param name="timeout">TBD</param> | |||
/// <param name="defaultSerializer">The default serializer used when not type override matching is found</param> | |||
/// <param name="useSequentialAccess">Uses the CommandBehavior.SequentialAccess when creating the command, providing a performance improvement for reading large BLOBS.</param> | |||
[Obsolete("Use .ctor that accepts read and write IsolationLevel instead (since v1.5.2)")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Marking old constructors as obsolete
{ | ||
using (var command = GetCommand(connection, AllPersistenceIdsSql)) | ||
return await connection.ExecuteInTransaction(ReadIsolationLevel, cancellationToken, async (tx, token) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Converts old code into delegate and tuck them inside the transaction wrapper.
There should be zero impact from these wrap, the code is marked with aggresive inlining so that they are unrolled as soon as possible either during compile or inside JIT.
No logic were changed, we're just wrapping them inside explicit transaction.
{ | ||
using (var command = GetCommand(connection, ByPersistenceIdSql)) | ||
await connection.ExecuteInTransaction(ReadIsolationLevel, cancellationToken, async (tx, token) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above, wrap the old code inside transaction.
@@ -27,7 +27,7 @@ public abstract class SqlJournal : AsyncWriteJournal, IWithUnboundedStash | |||
private IImmutableDictionary<string, long> _tagSequenceNr = ImmutableDictionary<string, long>.Empty; | |||
|
|||
private readonly CancellationTokenSource _pendingRequestsCancellation; | |||
private readonly JournalSettings _settings; | |||
protected readonly JournalSettings Settings; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Expose JournalSettings
as protected so inheriting classes can use them.
@@ -132,6 +143,7 @@ public class QueryConfiguration | |||
/// <param name="timeout">TBD</param> | |||
/// <param name="defaultSerializer">The default serializer used when not type override matching is found</param> | |||
/// <param name="useSequentialAccess">Uses the CommandBehavior.SequentialAccess when creating the command, providing a performance improvement for reading large BLOBS.</param> | |||
[Obsolete("Use the constructor that takes read and write isolation level argument (since v1.5.2)")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mark old constructor as obsolete for backward compatibility
@@ -38,7 +38,7 @@ private sealed class Initialized | |||
/// </summary> | |||
private readonly CancellationTokenSource _pendingRequestsCancellation; | |||
|
|||
private readonly SnapshotStoreSettings _settings; | |||
protected readonly SnapshotStoreSettings Settings; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Expose SnapshotStoreSettings
as protected so inheriting classes can use them
@@ -31,6 +31,14 @@ | |||
# timestamp provider used for generation of journal entries timestamps | |||
timestamp-provider = "Akka.Persistence.Sql.Common.Journal.DefaultTimestampProvider, Akka.Persistence.Sql.Common" | |||
|
|||
# The isolation level of all database read query | |||
# Valid values: "chaos", "read-committed", "read-uncommitted", "repeatable-read", "serializable", "snapshot", or "unspecified" | |||
read-isolation-level = unspecified |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These new isolation level settings need to be propagated to all Sql.Common
implementation when they are upgraded to the next Akka.NET
release.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the difference between chaos
and unspecified
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The differences are covered here: https://learn.microsoft.com/en-us/dotnet/api/system.data.isolationlevel?#fields
|
||
# The isolation level of all database write query | ||
# Valid values: "chaos", "read-committed", "read-uncommitted", "repeatable-read", "serializable", "snapshot", or "unspecified" | ||
write-isolation-level = unspecified |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These new isolation level settings need to be propagated to all Sql.Common
implementation when they are upgraded to the next Akka.NET
release.
/// </summary> | ||
public IsolationLevel IsolationLevel { get; } | ||
[Obsolete("Use WriteIsolationLevel property instead")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make old property obsolete, old isolation level is used for both read and write, we now have fine grain control over both.
var stopwatch = new Stopwatch(); | ||
using (var connection = CreateConnection(Setup.ConnectionString)) | ||
{ | ||
await connection.OpenAsync(); | ||
|
||
// In the grand scheme of thing, using a transaction in an all read batch operation | ||
// should not hurt performance by much, because it is done only once at the start. | ||
using (var tx = connection.BeginTransaction(Setup.IsolationLevel)) | ||
using (var tx = connection.BeginTransaction(isWriteOperation ? _writeIsolationLevel : _readIsolationLevel)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The kind of isolation level is controlled via the write flag argument
…akka.net into add_sql_query_isolation_level
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Fixes #6640
Changes