Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor/entityframework: Support optimistic concurrency #79

Merged
merged 2 commits into from
Jun 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ public void OnExecuting(ChangeTracker changeTracker)
return;

changeTracker.DetectChanges();
foreach (var entity in changeTracker.Entries().Where(entry => entry.State == EntityState.Deleted))
var entries = changeTracker.Entries().Where(entry => entry.State == EntityState.Deleted);
foreach (var entity in entries)
{
if (entity.Entity is ISoftDelete)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) MASA Stack All rights reserved.
// Licensed under the MIT License. See LICENSE.txt in the project root for license information.

namespace Masa.Contrib.Data.EntityFrameworkCore;

public static class EntityTypeBuilderExtensions
{
private const int _maxLength = 36;

public static void TryConfigureConcurrencyStamp(
this EntityTypeBuilder entityTypeBuilder,
string? propertyName)
=> entityTypeBuilder.TryConfigureConcurrencyStamp(_maxLength, propertyName);

public static void TryConfigureConcurrencyStamp(
this EntityTypeBuilder entityTypeBuilder,
int maxLength = _maxLength,
string? propertyName = null)
{
if (entityTypeBuilder.Metadata.ClrType.IsAssignableTo(typeof(IHasConcurrencyStamp)))
{
entityTypeBuilder.Property(nameof(IHasConcurrencyStamp.RowVersion))
.IsConcurrencyToken()
.HasMaxLength(maxLength)
.HasColumnName(propertyName ?? nameof(IHasConcurrencyStamp.RowVersion));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

<ItemGroup>
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="$(MicrosoftPackageVersion)" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="$(MicrosoftPackageVersion)" />
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="$(MicrosoftPackageVersion)" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="$(MicrosoftPackageVersion)" />
</ItemGroup>
Expand Down
17 changes: 16 additions & 1 deletion src/Data/Masa.Contrib.Data.EntityFrameworkCore/MasaDbContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ public abstract class MasaDbContext : DbContext, IMasaDbContext
protected readonly IDataFilter? DataFilter;
protected readonly MasaDbContextOptions Options;
protected IDomainEventBus? DomainEventBus => Options.ServiceProvider.GetService<IDomainEventBus>();
private IConcurrencyStampProvider _concurrencyStampProvider => Options.ServiceProvider.GetRequiredService<IConcurrencyStampProvider>();

public MasaDbContext(MasaDbContextOptions options) : base(options)
{
Expand Down Expand Up @@ -101,6 +102,7 @@ protected virtual void OnBeforeSaveChanges()
{
if (Options != null)
{
UpdateRowVesion(ChangeTracker);
OnBeforeSaveChangesByFilters();
DomainEventEnqueueAsync(ChangeTracker).ConfigureAwait(false).GetAwaiter().GetResult();
}
Expand All @@ -110,6 +112,7 @@ protected virtual async Task OnBeforeSaveChangesAsync()
{
if (Options != null)
{
UpdateRowVesion(ChangeTracker);
OnBeforeSaveChangesByFilters();
await DomainEventEnqueueAsync(ChangeTracker);
}
Expand Down Expand Up @@ -150,6 +153,17 @@ protected virtual async Task DomainEventEnqueueAsync(ChangeTracker changeTracker
await DomainEventBus.Enqueue(domainEvent);
}

protected virtual void UpdateRowVesion(ChangeTracker changeTracker)
{
var entries = changeTracker.Entries().Where(entry
=> (entry.State == EntityState.Added || entry.State == EntityState.Modified || entry.State == EntityState.Deleted) &&
entry.Entity is IHasConcurrencyStamp);
foreach (var entity in entries)
{
entity.CurrentValues[nameof(IHasConcurrencyStamp.RowVersion)] = _concurrencyStampProvider.GetRowVersion();
}
}

/// <summary>
/// Automatic soft delete.
/// <inheritdoc/>
Expand Down Expand Up @@ -181,7 +195,8 @@ public MasaDbContext(MasaDbContextOptions<TDbContext> options) : base(options)

protected override void OnModelCreatingConfigureGlobalFilters(ModelBuilder modelBuilder)
{
var methodInfo = typeof(MasaDbContext<TDbContext>).GetMethod(nameof(ConfigureGlobalFilters), BindingFlags.NonPublic | BindingFlags.Instance);
var methodInfo =
typeof(MasaDbContext<TDbContext>).GetMethod(nameof(ConfigureGlobalFilters), BindingFlags.NonPublic | BindingFlags.Instance);

foreach (var entityType in modelBuilder.Model.GetEntityTypes())
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ private static IServiceCollection AddCoreServices<TDbContextImplementation>(
ServiceLifetime optionsLifetime)
where TDbContextImplementation : MasaDbContext, IMasaDbContext
{
services.TryAddSingleton<IConcurrencyStampProvider, DefaultConcurrencyStampProvider>();
services.TryAddScoped<IConnectionStringProvider, DefaultConnectionStringProvider>();
services.TryAddSingleton<IDbConnectionStringProvider, DbConnectionStringProvider>();

Expand Down
1 change: 1 addition & 0 deletions src/Data/Masa.Contrib.Data.EntityFrameworkCore/_Imports.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
global using Microsoft.EntityFrameworkCore.ChangeTracking;
global using Microsoft.EntityFrameworkCore.Infrastructure;
global using Microsoft.EntityFrameworkCore.Metadata;
global using Microsoft.EntityFrameworkCore.Metadata.Builders;
global using Microsoft.Extensions.Configuration;
global using Microsoft.Extensions.DependencyInjection;
global using Microsoft.Extensions.DependencyInjection.Extensions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ private void ConfigureEventLogEntry(EntityTypeBuilder<IntegrationEventLog> build
builder.Property(e => e.TimesSent)
.IsRequired();

builder.Property(e => e.RowVersion)
builder.Property(nameof(IHasConcurrencyStamp.RowVersion))
.IsConcurrencyToken()
.HasMaxLength(36)
.IsRequired();
.HasColumnName(nameof(IHasConcurrencyStamp.RowVersion));

builder.Property(e => e.EventTypeName)
.IsRequired();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ public IntegrationEventLogService(
/// <param name="maxRetryTimes"></param>
/// <param name="minimumRetryInterval">default: 60s</param>
/// <returns></returns>
public async Task<IEnumerable<IntegrationEventLog>> RetrieveEventLogsFailedToPublishAsync(int retryBatchSize = 200, int maxRetryTimes = 10, int minimumRetryInterval = 60)
public async Task<IEnumerable<IntegrationEventLog>> RetrieveEventLogsFailedToPublishAsync(int retryBatchSize = 200,
int maxRetryTimes = 10, int minimumRetryInterval = 60)
{
//todo: Subsequent acquisition of the current time needs to be uniformly replaced with the unified time method provided by the framework, which is convenient for subsequent uniform replacement to UTC time or other urban time. The default setting here is Utc time.
var time = DateTime.UtcNow.AddSeconds(-minimumRetryInterval);
Expand Down Expand Up @@ -75,7 +76,8 @@ public Task MarkEventAsPublishedAsync(Guid eventId)
_logger?.LogWarning(
"Failed to modify the state of the local message table to {OptState}, the current State is {State}, Id: {Id}",
IntegrationEventStates.Published, eventLog.State, eventLog.Id);
throw new UserFriendlyException($"Failed to modify the state of the local message table to {IntegrationEventStates.Published}, the current State is {eventLog.State}, Id: {eventLog.Id}");
throw new UserFriendlyException(
$"Failed to modify the state of the local message table to {IntegrationEventStates.Published}, the current State is {eventLog.State}, Id: {eventLog.Id}");
}
});
}
Expand All @@ -89,7 +91,8 @@ public Task MarkEventAsInProgressAsync(Guid eventId)
_logger?.LogWarning(
"Failed to modify the state of the local message table to {OptState}, the current State is {State}, Id: {Id}",
IntegrationEventStates.InProgress, eventLog.State, eventLog.Id);
throw new UserFriendlyException($"Failed to modify the state of the local message table to {IntegrationEventStates.InProgress}, the current State is {eventLog.State}, Id: {eventLog.Id}");
throw new UserFriendlyException(
$"Failed to modify the state of the local message table to {IntegrationEventStates.InProgress}, the current State is {eventLog.State}, Id: {eventLog.Id}");
}
});
}
Expand All @@ -103,7 +106,8 @@ public Task MarkEventAsFailedAsync(Guid eventId)
_logger?.LogWarning(
"Failed to modify the state of the local message table to {OptState}, the current State is {State}, Id: {Id}",
IntegrationEventStates.PublishedFailed, eventLog.State, eventLog.Id);
throw new UserFriendlyException($"Failed to modify the state of the local message table to {IntegrationEventStates.PublishedFailed}, the current State is {eventLog.State}, Id: {eventLog.Id}");
throw new UserFriendlyException(
$"Failed to modify the state of the local message table to {IntegrationEventStates.PublishedFailed}, the current State is {eventLog.State}, Id: {eventLog.Id}");
}
});
}
Expand All @@ -124,7 +128,7 @@ public async Task DeleteExpiresAsync(DateTime expiresAt, int batchCount = 1000,

private async Task UpdateEventStatus(Guid eventId, IntegrationEventStates status, Action<IntegrationEventLog>? action = null)
{
var eventLogEntry = _eventLogContext.EventLogs.AsNoTracking().FirstOrDefault(e => e.EventId == eventId);
var eventLogEntry = _eventLogContext.EventLogs.FirstOrDefault(e => e.EventId == eventId);
if (eventLogEntry == null)
throw new ArgumentException(nameof(eventId));

Expand All @@ -136,47 +140,27 @@ private async Task UpdateEventStatus(Guid eventId, IntegrationEventStates status
if (status == IntegrationEventStates.InProgress)
eventLogEntry.TimesSent++;

await UpdateAsync(eventLogEntry);
_eventLogContext.EventLogs.Update(eventLogEntry);

try
{
await _eventLogContext.DbContext.SaveChangesAsync();
}
catch (DbUpdateConcurrencyException ex)
{
_logger?.LogWarning(
ex,
"Concurrency error, Failed to modify the state of the local message table to {OptState}, the current State is {State}, Id: {Id}",
status, eventLogEntry.State, eventLogEntry.Id);
throw new UserFriendlyException("Concurrency conflict, update exception");
}

CheckAndDetached(eventLogEntry);
}

private void CheckAndDetached(IntegrationEventLog integrationEvent)
{
if (_eventLogContext.DbContext.ChangeTracker.QueryTrackingBehavior != QueryTrackingBehavior.TrackAll)
_eventLogContext.DbContext.Entry(integrationEvent).State = EntityState.Detached;
}

/// <summary>
/// By using SQL statements to handle high concurrency, reduce the dependence on the database
/// Turning on a transactional operation makes the operation atomic
/// Retrying the task operation in the background will not open the transaction, so that the task will only be executed once in high concurrency scenarios
/// </summary>
/// <param name="eventLogEntry"></param>
private async Task UpdateAsync(IntegrationEventLog eventLogEntry)
{
string eventIdColumn = _eventLogContext.DbContext.GetPropertyName<IntegrationEventLog>(nameof(IntegrationEventLog.EventId));
string stateColumn = _eventLogContext.DbContext.GetPropertyName<IntegrationEventLog>(nameof(IntegrationEventLog.State));
string modificationTimeColumn = _eventLogContext.DbContext.GetPropertyName<IntegrationEventLog>(nameof(IntegrationEventLog.ModificationTime));
string timesSentColumn = _eventLogContext.DbContext.GetPropertyName<IntegrationEventLog>(nameof(IntegrationEventLog.TimesSent));
string rowVersionColumn = _eventLogContext.DbContext.GetPropertyName<IntegrationEventLog>(nameof(IntegrationEventLog.RowVersion));
string tableName = _eventLogContext.DbContext.GetTableName<IntegrationEventLog>();
var newVersion = Guid.NewGuid().ToString();

string updateSql = $"UPDATE { tableName } set [{ stateColumn }] = {{0}}, [{modificationTimeColumn }] = {{1}}, [{ timesSentColumn }] = {{2}}, [{ rowVersionColumn }] = {{3}} where [{ eventIdColumn }] = {{4}} and [{ rowVersionColumn }] = {{5}};";
await ExecuteAsync(updateSql, new object[]
{
(int)eventLogEntry.State,
eventLogEntry.ModificationTime,
eventLogEntry.TimesSent,
newVersion,
eventLogEntry.EventId,
eventLogEntry.RowVersion
});
}

private async Task ExecuteAsync(string updateSql, params object[] parameters)
{
var effectRow = await _eventLogContext.DbContext.Database.ExecuteSqlRawAsync(updateSql, parameters);
if (effectRow == 0)
throw new UserFriendlyException("Concurrency conflict, update exception");
}
}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,12 @@
global using Masa.BuildingBlocks.Dispatcher.IntegrationEvents;
global using Masa.BuildingBlocks.Dispatcher.IntegrationEvents.Logs;
global using Masa.Contrib.Data.EntityFrameworkCore;
global using Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EF.Internal;
global using Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EF.Internal.Options;
global using Microsoft.EntityFrameworkCore;
global using Microsoft.EntityFrameworkCore.Metadata;
global using Microsoft.EntityFrameworkCore.Metadata.Builders;
global using Microsoft.Extensions.DependencyInjection;
global using Microsoft.Extensions.DependencyInjection.Extensions;
global using Microsoft.Extensions.Logging;
global using System;
global using System.Collections.Concurrent;
global using System.Collections.Generic;
global using System.Data.Common;
global using System.Linq;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ public IsolationSaveChangesFilter(IServiceProvider serviceProvider)
public void OnExecuting(ChangeTracker changeTracker)
{
changeTracker.DetectChanges();
foreach (var entity in changeTracker.Entries().Where(entry => entry.State == EntityState.Added))
var entries = changeTracker.Entries().Where(entry => entry.State == EntityState.Added);
foreach (var entity in entries)
{
if (entity.Entity is IMultiTenant<TKey> && _tenantContext != null)
{
Expand Down