diff --git a/src/BuildingBlocks/Dispatcher/Masa.BuildingBlocks.Dispatcher.IntegrationEvents/Logs/IIntegrationEventLogService.cs b/src/BuildingBlocks/Dispatcher/Masa.BuildingBlocks.Dispatcher.IntegrationEvents/Logs/IIntegrationEventLogService.cs index 710902068..23d52a514 100644 --- a/src/BuildingBlocks/Dispatcher/Masa.BuildingBlocks.Dispatcher.IntegrationEvents/Logs/IIntegrationEventLogService.cs +++ b/src/BuildingBlocks/Dispatcher/Masa.BuildingBlocks.Dispatcher.IntegrationEvents/Logs/IIntegrationEventLogService.cs @@ -23,7 +23,7 @@ Task> RetrieveEventLogsFailedToPublishAsync( Task MarkEventAsPublishedAsync(Guid eventId, CancellationToken cancellationToken = default); - Task MarkEventAsInProgressAsync(Guid eventId, CancellationToken cancellationToken = default); + Task MarkEventAsInProgressAsync(Guid eventId, int minimumRetryInterval,CancellationToken cancellationToken = default); Task MarkEventAsFailedAsync(Guid eventId, CancellationToken cancellationToken = default); diff --git a/src/Contrib/Data/UoW/Tests/Masa.Contrib.Data.UoW.EFCore.Tests/UnitOfWorkTest.cs b/src/Contrib/Data/UoW/Tests/Masa.Contrib.Data.UoW.EFCore.Tests/UnitOfWorkTest.cs index 206cf751e..f391c0b6c 100644 --- a/src/Contrib/Data/UoW/Tests/Masa.Contrib.Data.UoW.EFCore.Tests/UnitOfWorkTest.cs +++ b/src/Contrib/Data/UoW/Tests/Masa.Contrib.Data.UoW.EFCore.Tests/UnitOfWorkTest.cs @@ -83,7 +83,7 @@ public async Task TestCommitAsync() { Name = "Tom" }; - var transcation = uoW.Transaction; + var transaction = uoW.Transaction; dbContext.User.Add(user); uoW.EntityState = EntityState.Changed; await uoW.SaveChangesAsync(); @@ -102,7 +102,7 @@ public async Task TestOpenRollbackAsync() await dbContext.Database.EnsureCreatedAsync(); var uoW = serviceProvider.GetRequiredService(); var user = new Users(); - var transcation = uoW.Transaction; + var transaction = uoW.Transaction; dbContext.User.Add(user); await uoW.CommitAsync(); @@ -119,7 +119,7 @@ public async Task TestAddLoggerAndOpenRollbackAsync() await dbContext.Database.EnsureCreatedAsync(); var uoW = serviceProvider.GetRequiredService(); var user = new Users(); - var transcation = uoW.Transaction; + var transaction = uoW.Transaction; dbContext.User.Add(user); await uoW.CommitAsync(); diff --git a/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore/IntegrationEventLogService.cs b/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore/IntegrationEventLogService.cs index 49b712d79..b72b1556a 100644 --- a/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore/IntegrationEventLogService.cs +++ b/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore/IntegrationEventLogService.cs @@ -86,11 +86,22 @@ public Task MarkEventAsPublishedAsync(Guid eventId, CancellationToken cancellati }, cancellationToken); } - public Task MarkEventAsInProgressAsync(Guid eventId, CancellationToken cancellationToken = default) + public Task MarkEventAsInProgressAsync(Guid eventId, int minimumRetryInterval, CancellationToken cancellationToken = default) { return UpdateEventStatus(eventId, IntegrationEventStates.InProgress, eventLog => { - if (eventLog.State != IntegrationEventStates.NotPublished && eventLog.State != IntegrationEventStates.PublishedFailed) + if (eventLog.State is IntegrationEventStates.InProgress or IntegrationEventStates.PublishedFailed && + (eventLog.GetCurrentTime() - eventLog.ModificationTime).TotalSeconds < minimumRetryInterval) + { + _logger?.LogInformation( + "Failed to modify the state of the local message table to {OptState}, the current State is {State}, Id: {Id}, Multitasking execution error, waiting for the next retry", + IntegrationEventStates.InProgress, eventLog.State, eventLog.Id); + throw new UserFriendlyException( + $"Failed to modify the state of the local message table to {IntegrationEventStates.InProgress}, the current State is {eventLog.State}, Id: {eventLog.Id}, Multitasking execution error, waiting for the next retry"); + } + if (eventLog.State != IntegrationEventStates.NotPublished && + eventLog.State != IntegrationEventStates.InProgress && + eventLog.State != IntegrationEventStates.PublishedFailed) { _logger?.LogWarning( "Failed to modify the state of the local message table to {OptState}, the current State is {State}, Id: {Id}", @@ -135,7 +146,8 @@ private async Task UpdateEventStatus(Guid eventId, Action? action = null, CancellationToken cancellationToken = default) { - var eventLogEntry = await _eventLogContext.EventLogs.FirstOrDefaultAsync(e => e.EventId == eventId, cancellationToken: cancellationToken); + var eventLogEntry = + await _eventLogContext.EventLogs.FirstOrDefaultAsync(e => e.EventId == eventId, cancellationToken: cancellationToken); if (eventLogEntry == null) throw new ArgumentException( $"The local message record does not exist, please confirm whether the local message record has been deleted or other reasons cause the local message record to not be inserted successfully In EventId: {eventId}", diff --git a/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents/IntegrationEventBus.cs b/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents/IntegrationEventBus.cs index efa2f1e07..130536c53 100644 --- a/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents/IntegrationEventBus.cs +++ b/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents/IntegrationEventBus.cs @@ -73,7 +73,9 @@ private async Task PublishIntegrationAsync(TEvent @event, CancellationTo @event.GetEventId(), _masaAppConfigureOptions?.CurrentValue.AppId ?? string.Empty, @event); - await _eventLogService.MarkEventAsInProgressAsync(@event.GetEventId(),cancellationToken); + await _eventLogService.MarkEventAsInProgressAsync(@event.GetEventId(), + _dispatcherOptions.MinimumRetryInterval, + cancellationToken); _logger?.LogDebug("Publishing event {Event} to {TopicName}", @event, topicName); await _publisher.PublishAsync(topicName, (dynamic)@event, cancellationToken); diff --git a/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents/Processor/RetryByDataProcessor.cs b/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents/Processor/RetryByDataProcessor.cs index 8c83630f1..ea41b7337 100644 --- a/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents/Processor/RetryByDataProcessor.cs +++ b/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents/Processor/RetryByDataProcessor.cs @@ -25,46 +25,49 @@ public RetryByDataProcessor( protected override async Task ExecuteAsync(IServiceProvider serviceProvider, CancellationToken stoppingToken) { var unitOfWork = serviceProvider.GetService(); - if (unitOfWork != null) - unitOfWork.UseTransaction = false; + if (unitOfWork != null) + unitOfWork.UseTransaction = false; - var publisher = serviceProvider.GetRequiredService(); - var eventLogService = serviceProvider.GetRequiredService(); + var publisher = serviceProvider.GetRequiredService(); + var eventLogService = serviceProvider.GetRequiredService(); - var retrieveEventLogs = - await eventLogService.RetrieveEventLogsFailedToPublishAsync(_options.Value.RetryBatchSize, _options.Value.MaxRetryTimes, - _options.Value.MinimumRetryInterval, stoppingToken); + var retrieveEventLogs = + await eventLogService.RetrieveEventLogsFailedToPublishAsync( + _options.Value.RetryBatchSize, + _options.Value.MaxRetryTimes, + _options.Value.MinimumRetryInterval, + stoppingToken); - foreach (var eventLog in retrieveEventLogs) + foreach (var eventLog in retrieveEventLogs) + { + try { - try - { - if (LocalQueueProcessor.Default.IsExist(eventLog.EventId)) - continue; // The local queue is retrying, no need to retry + if (LocalQueueProcessor.Default.IsExist(eventLog.EventId)) + continue; // The local queue is retrying, no need to retry - await eventLogService.MarkEventAsInProgressAsync(eventLog.EventId, stoppingToken); + await eventLogService.MarkEventAsInProgressAsync(eventLog.EventId, _options.Value.MinimumRetryInterval, stoppingToken); - _logger?.LogDebug("Publishing integration event {Event} to {TopicName}", - eventLog, - eventLog.Event.Topic); + _logger?.LogDebug("Publishing integration event {Event} to {TopicName}", + eventLog, + eventLog.Event.Topic); - await publisher.PublishAsync(eventLog.Event.Topic, eventLog.Event, stoppingToken); + await publisher.PublishAsync(eventLog.Event.Topic, eventLog.Event, stoppingToken); - LocalQueueProcessor.Default.RemoveJobs(eventLog.EventId); + LocalQueueProcessor.Default.RemoveJobs(eventLog.EventId); - await eventLogService.MarkEventAsPublishedAsync(eventLog.EventId, stoppingToken); - } - catch (UserFriendlyException) - { - //Update state due to multitasking contention, no processing required - } - catch (Exception ex) - { - _logger?.LogError(ex, - "Error Publishing integration event: {IntegrationEventId} from {AppId} - ({IntegrationEvent})", - eventLog.EventId, _masaAppConfigureOptions?.CurrentValue.AppId ?? string.Empty, eventLog); - await eventLogService.MarkEventAsFailedAsync(eventLog.EventId, stoppingToken); - } + await eventLogService.MarkEventAsPublishedAsync(eventLog.EventId, stoppingToken); } + catch (UserFriendlyException) + { + //Update state due to multitasking contention, no processing required + } + catch (Exception ex) + { + _logger?.LogError(ex, + "Error Publishing integration event: {IntegrationEventId} from {AppId} - ({IntegrationEvent})", + eventLog.EventId, _masaAppConfigureOptions?.CurrentValue.AppId ?? string.Empty, eventLog); + await eventLogService.MarkEventAsFailedAsync(eventLog.EventId, stoppingToken); + } + } } } diff --git a/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents/Processor/RetryByLocalQueueProcessor.cs b/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents/Processor/RetryByLocalQueueProcessor.cs index d7a12c8d6..052a97696 100644 --- a/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents/Processor/RetryByLocalQueueProcessor.cs +++ b/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents/Processor/RetryByLocalQueueProcessor.cs @@ -25,47 +25,47 @@ public RetryByLocalQueueProcessor( protected override async Task ExecuteAsync(IServiceProvider serviceProvider, CancellationToken stoppingToken) { var unitOfWork = serviceProvider.GetService(); - if (unitOfWork != null) - unitOfWork.UseTransaction = false; + if (unitOfWork != null) + unitOfWork.UseTransaction = false; - var publisher = serviceProvider.GetRequiredService(); - var eventLogService = serviceProvider.GetRequiredService(); + var publisher = serviceProvider.GetRequiredService(); + var eventLogService = serviceProvider.GetRequiredService(); - var retrieveEventLogs = - LocalQueueProcessor.Default.RetrieveEventLogsFailedToPublishAsync(_options.Value.LocalRetryTimes, - _options.Value.RetryBatchSize); + var retrieveEventLogs = + LocalQueueProcessor.Default.RetrieveEventLogsFailedToPublishAsync(_options.Value.LocalRetryTimes, + _options.Value.RetryBatchSize); - foreach (var eventLog in retrieveEventLogs) + foreach (var eventLog in retrieveEventLogs) + { + try { - try - { - LocalQueueProcessor.Default.RetryJobs(eventLog.EventId); + LocalQueueProcessor.Default.RetryJobs(eventLog.EventId); - await eventLogService.MarkEventAsInProgressAsync(eventLog.EventId); + await eventLogService.MarkEventAsInProgressAsync(eventLog.EventId, _options.Value.MinimumRetryInterval, stoppingToken); - _logger?.LogDebug( - "Publishing integration event {Event} to {TopicName}", - eventLog, - eventLog.Topic); + _logger?.LogDebug( + "Publishing integration event {Event} to {TopicName}", + eventLog, + eventLog.Topic); - await publisher.PublishAsync(eventLog.Topic, eventLog.Event, stoppingToken); + await publisher.PublishAsync(eventLog.Topic, eventLog.Event, stoppingToken); - await eventLogService.MarkEventAsPublishedAsync(eventLog.EventId); + await eventLogService.MarkEventAsPublishedAsync(eventLog.EventId, stoppingToken); - LocalQueueProcessor.Default.RemoveJobs(eventLog.EventId); - } - catch (UserFriendlyException) - { - //Update state due to multitasking contention - LocalQueueProcessor.Default.RemoveJobs(eventLog.EventId); - } - catch (Exception ex) - { - _logger?.LogError(ex, - "Error Publishing integration event: {IntegrationEventId} from {AppId} - ({IntegrationEvent})", - eventLog.EventId, _masaAppConfigureOptions?.CurrentValue.AppId ?? string.Empty, eventLog); - await eventLogService.MarkEventAsFailedAsync(eventLog.EventId); - } + LocalQueueProcessor.Default.RemoveJobs(eventLog.EventId); } + catch (UserFriendlyException) + { + //Update state due to multitasking contention + LocalQueueProcessor.Default.RemoveJobs(eventLog.EventId); + } + catch (Exception ex) + { + _logger?.LogError(ex, + "Error Publishing integration event: {IntegrationEventId} from {AppId} - ({IntegrationEvent})", + eventLog.EventId, _masaAppConfigureOptions?.CurrentValue.AppId ?? string.Empty, eventLog); + await eventLogService.MarkEventAsFailedAsync(eventLog.EventId, stoppingToken); + } + } } } diff --git a/src/Contrib/Dispatcher/IntegrationEvents/Tests/Masa.Contrib.Dispatcher.IntegrationEvents.Dapr.Tests/IntegrationEventBusTest.cs b/src/Contrib/Dispatcher/IntegrationEvents/Tests/Masa.Contrib.Dispatcher.IntegrationEvents.Dapr.Tests/IntegrationEventBusTest.cs index b46280848..b482e926f 100644 --- a/src/Contrib/Dispatcher/IntegrationEvents/Tests/Masa.Contrib.Dispatcher.IntegrationEvents.Dapr.Tests/IntegrationEventBusTest.cs +++ b/src/Contrib/Dispatcher/IntegrationEvents/Tests/Masa.Contrib.Dispatcher.IntegrationEvents.Dapr.Tests/IntegrationEventBusTest.cs @@ -24,7 +24,7 @@ public void Initialize() .Returns(() => new DispatcherOptions(_options.Object.Services, AppDomain.CurrentDomain.GetAssemblies())); _eventLog = new(); _eventLog.Setup(eventLog => eventLog.SaveEventAsync(It.IsAny(), null!, default)).Verifiable(); - _eventLog.Setup(eventLog => eventLog.MarkEventAsInProgressAsync(It.IsAny(), default)).Verifiable(); + _eventLog.Setup(eventLog => eventLog.MarkEventAsInProgressAsync(It.IsAny(), It.IsAny(), default)).Verifiable(); _eventLog.Setup(eventLog => eventLog.MarkEventAsPublishedAsync(It.IsAny(), default)).Verifiable(); _eventLog.Setup(eventLog => eventLog.MarkEventAsFailedAsync(It.IsAny(), default)).Verifiable(); _masaAppConfigureOptions = new(); diff --git a/src/Contrib/Dispatcher/IntegrationEvents/Tests/Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore.Tests/IntegrationEventLogServiceTest.cs b/src/Contrib/Dispatcher/IntegrationEvents/Tests/Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore.Tests/IntegrationEventLogServiceTest.cs index bab3be13e..9c51309dc 100644 --- a/src/Contrib/Dispatcher/IntegrationEvents/Tests/Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore.Tests/IntegrationEventLogServiceTest.cs +++ b/src/Contrib/Dispatcher/IntegrationEvents/Tests/Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore.Tests/IntegrationEventLogServiceTest.cs @@ -99,9 +99,9 @@ public async Task TestSaveEventAsync() { var response = await InitializeAsync(); - Assert.IsTrue(await response.CustomDbContext.Set().CountAsync() == 0); + Assert.IsFalse(await response.CustomDbContext.Set().AnyAsync()); - await using (var transcation = await response.CustomDbContext.Database.BeginTransactionAsync()) + await using (var transaction = await response.CustomDbContext.Database.BeginTransactionAsync()) { var logService = response.ServiceProvider.GetRequiredService(); var @event = new OrderPaymentSucceededIntegrationEvent @@ -109,8 +109,8 @@ public async Task TestSaveEventAsync() OrderId = "1234567890123", PaymentTime = (long)(DateTime.UtcNow - new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc)).TotalSeconds }; - await logService.SaveEventAsync(@event, transcation.GetDbTransaction()); - await transcation.CommitAsync(); + await logService.SaveEventAsync(@event, transaction.GetDbTransaction()); + await transaction.CommitAsync(); } Assert.IsTrue(await response.CustomDbContext.Set().CountAsync() == 1); @@ -124,56 +124,81 @@ public async Task TestSaveEventByExceptionAsync() { var response = await InitializeAsync(); - Assert.IsTrue(await response.CustomDbContext.Set().CountAsync() == 0); + Assert.IsFalse(await response.CustomDbContext.Set().AnyAsync()); await Assert.ThrowsExceptionAsync(async () => { - await using var transcation = await response.CustomDbContext.Database.BeginTransactionAsync(); + await using var transaction = await response.CustomDbContext.Database.BeginTransactionAsync(); var logService = response.ServiceProvider.GetRequiredService(); var @event = new OrderPaymentSucceededIntegrationEvent { OrderId = "1234567890123", PaymentTime = (long)(DateTime.UtcNow - new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc)).TotalSeconds }; - await logService.SaveEventAsync(@event, transcation.GetDbTransaction()); + await logService.SaveEventAsync(@event, transaction.GetDbTransaction()); throw new Exception("custom exception"); }, "custom exception"); - Assert.IsTrue(await response.CustomDbContext.Set().CountAsync() == 0); + Assert.IsFalse(await response.CustomDbContext.Set().AnyAsync()); } [TestMethod] public async Task TestMarkEventAsInProgressAsync() { var response = await InitializeAsync(); - Assert.IsTrue(await response.CustomDbContext.Set().CountAsync() == 0); + Assert.IsFalse(await response.CustomDbContext.Set().AnyAsync()); - await using var transcation = await response.CustomDbContext.Database.BeginTransactionAsync(); + await using var transaction = await response.CustomDbContext.Database.BeginTransactionAsync(); var logService = response.ServiceProvider.GetRequiredService(); var @event = new OrderPaymentSucceededIntegrationEvent { OrderId = "1234567890123", PaymentTime = (long)(DateTime.UtcNow - new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc)).TotalSeconds }; - await logService.SaveEventAsync(@event, transcation.GetDbTransaction()); + await logService.SaveEventAsync(@event, transaction.GetDbTransaction()); - await logService.MarkEventAsInProgressAsync(@event.Id); + await logService.MarkEventAsInProgressAsync(@event.Id, 10); Assert.IsTrue(await response.CustomDbContext.Set() .CountAsync(log => log.State == IntegrationEventStates.InProgress) == 1); - await transcation.CommitAsync(); + await transaction.CommitAsync(); Assert.IsTrue(await response.CustomDbContext.Set() .CountAsync(log => log.State == IntegrationEventStates.InProgress) == 1); Assert.IsTrue(await response.CustomDbContext.Set().CountAsync() == 1); } + + [TestMethod] + public async Task TestMultiJobMarkEventAsInProgressAsync() + { + var response = await InitializeAsync(); + Assert.IsFalse(await response.CustomDbContext.Set().AnyAsync()); + + await using var transaction = await response.CustomDbContext.Database.BeginTransactionAsync(); + var logService = response.ServiceProvider.GetRequiredService(); + var @event = new OrderPaymentSucceededIntegrationEvent + { + OrderId = "1234567890123", + PaymentTime = (long)(DateTime.UtcNow - new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc)).TotalSeconds + }; + await logService.SaveEventAsync(@event, transaction.GetDbTransaction()); + + await logService.MarkEventAsInProgressAsync(@event.Id, 10); + Assert.IsTrue(await response.CustomDbContext.Set() + .CountAsync(log => log.State == IntegrationEventStates.InProgress) == 1); + await transaction.CommitAsync(); + + await Assert.ThrowsExceptionAsync(async () + => await logService.MarkEventAsInProgressAsync(@event.Id, 60, default)); + } + [TestMethod] public async Task TestMarkEventAsInProgress2Async() { var response = await InitializeAsync(); - Assert.IsTrue(await response.CustomDbContext.Set().CountAsync() == 0); + Assert.IsFalse(await response.CustomDbContext.Set().AnyAsync()); - await using var transcation = await response.CustomDbContext.Database.BeginTransactionAsync(); + await using var transaction = await response.CustomDbContext.Database.BeginTransactionAsync(); var logService = response.ServiceProvider.GetRequiredService(); @@ -189,29 +214,29 @@ await response.CustomDbContext.Set().AddAsync(new Integrati await response.CustomDbContext.SaveChangesAsync(); - await Assert.ThrowsExceptionAsync(async () => await logService.MarkEventAsInProgressAsync(@event.Id)); + await Assert.ThrowsExceptionAsync(async () => await logService.MarkEventAsInProgressAsync(@event.Id, 10)); } [TestMethod] public async Task TestMarkEventAsPublishedAsync() { var response = await InitializeAsync(); - Assert.IsTrue(await response.CustomDbContext.Set().CountAsync() == 0); + Assert.IsFalse(await response.CustomDbContext.Set().AnyAsync()); - await using var transcation = await response.CustomDbContext.Database.BeginTransactionAsync(); + await using var transaction = await response.CustomDbContext.Database.BeginTransactionAsync(); var logService = response.ServiceProvider.GetRequiredService(); var @event = new OrderPaymentSucceededIntegrationEvent { OrderId = "1234567890123", PaymentTime = (long)(DateTime.UtcNow - new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc)).TotalSeconds }; - await logService.SaveEventAsync(@event, transcation.GetDbTransaction()); + await logService.SaveEventAsync(@event, transaction.GetDbTransaction()); - await logService.MarkEventAsInProgressAsync(@event.Id); + await logService.MarkEventAsInProgressAsync(@event.Id, 10); await logService.MarkEventAsPublishedAsync(@event.Id); - await transcation.CommitAsync(); + await transaction.CommitAsync(); Assert.IsTrue(await response.CustomDbContext.Set() .CountAsync(log => log.State == IntegrationEventStates.Published) == 1); @@ -222,16 +247,16 @@ public async Task TestMarkEventAsPublishedAsync() public async Task TestMarkEventAsPublished2Async() { var response = await InitializeAsync(); - Assert.IsTrue(await response.CustomDbContext.Set().CountAsync() == 0); + Assert.IsFalse(await response.CustomDbContext.Set().AnyAsync()); - await using var transcation = await response.CustomDbContext.Database.BeginTransactionAsync(); + await using var transaction = await response.CustomDbContext.Database.BeginTransactionAsync(); var logService = response.ServiceProvider.GetRequiredService(); var @event = new OrderPaymentSucceededIntegrationEvent { OrderId = "1234567890123", PaymentTime = (long)(DateTime.UtcNow - new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc)).TotalSeconds }; - await logService.SaveEventAsync(@event, transcation.GetDbTransaction()); + await logService.SaveEventAsync(@event, transaction.GetDbTransaction()); await Assert.ThrowsExceptionAsync(async () => await logService.MarkEventAsPublishedAsync(@event.Id)); } @@ -240,37 +265,38 @@ public async Task TestMarkEventAsPublished2Async() public async Task TestMarkEventAsFailedAsync() { var response = await InitializeAsync(); - Assert.IsTrue(await response.CustomDbContext.Set().CountAsync() == 0); + Assert.IsFalse(await response.CustomDbContext.Set().AnyAsync()); - await using var transcation = await response.CustomDbContext.Database.BeginTransactionAsync(); + await using var transaction = await response.CustomDbContext.Database.BeginTransactionAsync(); var logService = response.ServiceProvider.GetRequiredService(); var @event = new OrderPaymentSucceededIntegrationEvent { OrderId = "1234567890123", PaymentTime = (long)(DateTime.UtcNow - new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc)).TotalSeconds }; - await logService.SaveEventAsync(@event, transcation.GetDbTransaction()); - await logService.MarkEventAsInProgressAsync(@event.Id); + await logService.SaveEventAsync(@event, transaction.GetDbTransaction()); + await logService.MarkEventAsInProgressAsync(@event.Id, 10); await logService.MarkEventAsFailedAsync(@event.Id); - await transcation.CommitAsync(); + await transaction.CommitAsync(); - Assert.IsTrue(await response.CustomDbContext.Set().CountAsync(log => log.State == IntegrationEventStates.PublishedFailed) == 1); + Assert.IsTrue(await response.CustomDbContext.Set() + .CountAsync(log => log.State == IntegrationEventStates.PublishedFailed) == 1); } [TestMethod] public async Task TestMarkEventAsFailed2Async() { var response = await InitializeAsync(); - Assert.IsTrue(await response.CustomDbContext.Set().CountAsync() == 0); + Assert.IsFalse(await response.CustomDbContext.Set().AnyAsync()); - await using var transcation = await response.CustomDbContext.Database.BeginTransactionAsync(); + await using var transaction = await response.CustomDbContext.Database.BeginTransactionAsync(); var logService = response.ServiceProvider.GetRequiredService(); var @event = new OrderPaymentSucceededIntegrationEvent { OrderId = "1234567890123", PaymentTime = (long)(DateTime.UtcNow - new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc)).TotalSeconds }; - await logService.SaveEventAsync(@event, transcation.GetDbTransaction()); + await logService.SaveEventAsync(@event, transaction.GetDbTransaction()); await Assert.ThrowsExceptionAsync(async () => await logService.MarkEventAsFailedAsync(@event.Id)); } diff --git a/src/Contrib/Dispatcher/IntegrationEvents/Tests/Masa.Contrib.Dispatcher.IntegrationEvents.Tests/Infrastructure/CustomIntegrationEventLogService.cs b/src/Contrib/Dispatcher/IntegrationEvents/Tests/Masa.Contrib.Dispatcher.IntegrationEvents.Tests/Infrastructure/CustomIntegrationEventLogService.cs index a03be4556..567c0ab79 100644 --- a/src/Contrib/Dispatcher/IntegrationEvents/Tests/Masa.Contrib.Dispatcher.IntegrationEvents.Tests/Infrastructure/CustomIntegrationEventLogService.cs +++ b/src/Contrib/Dispatcher/IntegrationEvents/Tests/Masa.Contrib.Dispatcher.IntegrationEvents.Tests/Infrastructure/CustomIntegrationEventLogService.cs @@ -15,7 +15,7 @@ public Task DeleteExpiresAsync(DateTime expiresAt, int batchCount = 1000, Cancel throw new NotImplementedException(); } - public Task MarkEventAsInProgressAsync(Guid eventId, CancellationToken cancellationToken = default) + public Task MarkEventAsInProgressAsync(Guid eventId, int minimumRetryInterval, CancellationToken cancellationToken = default) { return Task.CompletedTask; } diff --git a/src/Contrib/Dispatcher/IntegrationEvents/Tests/Masa.Contrib.Dispatcher.IntegrationEvents.Tests/IntegrationEventBusTest.cs b/src/Contrib/Dispatcher/IntegrationEvents/Tests/Masa.Contrib.Dispatcher.IntegrationEvents.Tests/IntegrationEventBusTest.cs index 4b71ef9e6..71d07670c 100644 --- a/src/Contrib/Dispatcher/IntegrationEvents/Tests/Masa.Contrib.Dispatcher.IntegrationEvents.Tests/IntegrationEventBusTest.cs +++ b/src/Contrib/Dispatcher/IntegrationEvents/Tests/Masa.Contrib.Dispatcher.IntegrationEvents.Tests/IntegrationEventBusTest.cs @@ -28,7 +28,7 @@ public void Initialize() _logger = new(); _eventLog = new(); _eventLog.Setup(eventLog => eventLog.SaveEventAsync(It.IsAny(), null!, default)).Verifiable(); - _eventLog.Setup(eventLog => eventLog.MarkEventAsInProgressAsync(It.IsAny(), default)).Verifiable(); + _eventLog.Setup(eventLog => eventLog.MarkEventAsInProgressAsync(It.IsAny(), It.IsAny(), default)).Verifiable(); _eventLog.Setup(eventLog => eventLog.MarkEventAsPublishedAsync(It.IsAny(), default)).Verifiable(); _eventLog.Setup(eventLog => eventLog.MarkEventAsFailedAsync(It.IsAny(), default)).Verifiable(); _masaAppConfigureOptions = new(); @@ -108,7 +108,7 @@ public async Task TestNotUseUoWAndLoggerAsync() .Verifiable(); await integrationEventBus.PublishAsync(@event); - _eventLog.Verify(eventLog => eventLog.MarkEventAsInProgressAsync(@event.GetEventId(), default), Times.Never); + _eventLog.Verify(eventLog => eventLog.MarkEventAsInProgressAsync(@event.GetEventId(), It.IsAny(), default), Times.Never); _publisher.Verify(client => client.PublishAsync(@event.Topic, @event, default), Times.Once); _eventLog.Verify(eventLog => eventLog.MarkEventAsPublishedAsync(@event.GetEventId(), default), Times.Never); @@ -136,7 +136,7 @@ public async Task TestNotUseTransactionAsync() .Verifiable(); await integrationEventBus.PublishAsync(@event); - _eventLog.Verify(eventLog => eventLog.MarkEventAsInProgressAsync(@event.GetEventId(), default), Times.Never); + _eventLog.Verify(eventLog => eventLog.MarkEventAsInProgressAsync(@event.GetEventId(), It.IsAny(), default), Times.Never); _publisher.Verify(client => client.PublishAsync(@event.Topic, @event, default), Times.Once); _eventLog.Verify(eventLog => eventLog.MarkEventAsPublishedAsync(@event.GetEventId(), default), Times.Never); @@ -163,7 +163,7 @@ public async Task TestUseTranscationAndNotUseLoggerAsync() .Verifiable(); await integrationEventBus.PublishAsync(@event); - _eventLog.Verify(eventLog => eventLog.MarkEventAsInProgressAsync(@event.GetEventId(), default), Times.Once); + _eventLog.Verify(eventLog => eventLog.MarkEventAsInProgressAsync(@event.GetEventId(), It.IsAny(), default), Times.Once); _publisher.Verify(client => client.PublishAsync(@event.Topic, @event, default), Times.Once); _eventLog.Verify(eventLog => eventLog.MarkEventAsPublishedAsync(@event.GetEventId(), default), Times.Once); @@ -214,7 +214,7 @@ public async Task TestPublishIntegrationEventAndFailedAsync() .Verifiable(); await integrationEventBus.PublishAsync(@event); - _eventLog.Verify(eventLog => eventLog.MarkEventAsInProgressAsync(@event.GetEventId(), default), Times.Once); + _eventLog.Verify(eventLog => eventLog.MarkEventAsInProgressAsync(@event.GetEventId(), It.IsAny(), default), Times.Once); _publisher.Verify(client => client.PublishAsync(@event.Topic, @event, default), Times.Once); _eventLog.Verify(eventLog => eventLog.MarkEventAsPublishedAsync(@event.GetEventId(), default), Times.Once); diff --git a/src/Contrib/Dispatcher/IntegrationEvents/Tests/Masa.Contrib.Dispatcher.IntegrationEvents.Tests/ProcessorTest.cs b/src/Contrib/Dispatcher/IntegrationEvents/Tests/Masa.Contrib.Dispatcher.IntegrationEvents.Tests/ProcessorTest.cs index 7b8be4151..d0d3653af 100644 --- a/src/Contrib/Dispatcher/IntegrationEvents/Tests/Masa.Contrib.Dispatcher.IntegrationEvents.Tests/ProcessorTest.cs +++ b/src/Contrib/Dispatcher/IntegrationEvents/Tests/Masa.Contrib.Dispatcher.IntegrationEvents.Tests/ProcessorTest.cs @@ -57,7 +57,7 @@ public async Task RetryByDataProcessorExecuteTestAsync() Mock integrationEventLogService = new(); RegisterUserIntegrationEvent @event = new RegisterUserIntegrationEvent(); - integrationEventLogService.Setup(service => service.MarkEventAsInProgressAsync(It.IsAny(), cancellationTokenSource.Token)).Verifiable(); + integrationEventLogService.Setup(service => service.MarkEventAsInProgressAsync(It.IsAny(), It.IsAny(), cancellationTokenSource.Token)).Verifiable(); integrationEventLogService.Setup(service => service.MarkEventAsPublishedAsync(It.IsAny(), cancellationTokenSource.Token)).Verifiable(); List list = new List() @@ -107,7 +107,7 @@ public async Task RetryByDataProcessorExecuteTestAsync() serviceProvider.GetService>()); await retryByDataProcessor.ExecuteAsync(cancellationTokenSource.Token); - integrationEventLogService.Verify(service => service.MarkEventAsInProgressAsync(It.IsAny(), cancellationTokenSource.Token), Times.Exactly(2)); + integrationEventLogService.Verify(service => service.MarkEventAsInProgressAsync(It.IsAny(), It.IsAny(), cancellationTokenSource.Token), Times.Exactly(2)); integrationEventLogService.Verify(service => service.MarkEventAsPublishedAsync(It.IsAny(), cancellationTokenSource.Token), Times.Exactly(2)); } @@ -121,7 +121,7 @@ public async Task RetryByDataProcessorExecute2TestAsync() cancellationTokenSource.CancelAfter(1000); Mock integrationEventLogService = new(); - integrationEventLogService.Setup(service => service.MarkEventAsInProgressAsync(It.IsAny(), cancellationTokenSource.Token)) + integrationEventLogService.Setup(service => service.MarkEventAsInProgressAsync(It.IsAny(), It.IsAny(), cancellationTokenSource.Token)) .Verifiable(); integrationEventLogService.Setup(service => service.MarkEventAsPublishedAsync(It.IsAny(), cancellationTokenSource.Token)) .Verifiable(); @@ -191,7 +191,7 @@ public async Task RetryByDataProcessorExecute2TestAsync() serviceProvider.GetService>()); await retryByDataProcessor.ExecuteAsync(cancellationTokenSource.Token); - integrationEventLogService.Verify(service => service.MarkEventAsInProgressAsync(It.IsAny(), cancellationTokenSource.Token), Times.Exactly(2)); + integrationEventLogService.Verify(service => service.MarkEventAsInProgressAsync(It.IsAny(), It.IsAny(), cancellationTokenSource.Token), Times.Exactly(2)); integrationEventLogService.Verify(service => service.MarkEventAsPublishedAsync(It.IsAny(), cancellationTokenSource.Token), Times.Never); integrationEventLogService.Verify(service => service.MarkEventAsFailedAsync(It.IsAny(), cancellationTokenSource.Token), Times.Exactly(1)); } @@ -205,7 +205,7 @@ public async Task RetryByDataProcessorExecute2AndNotUseLoggerTestAsync() cancellationTokenSource.CancelAfter(1000); Mock integrationEventLogService = new(); - integrationEventLogService.Setup(service => service.MarkEventAsInProgressAsync(It.IsAny(), cancellationTokenSource.Token)).Verifiable(); + integrationEventLogService.Setup(service => service.MarkEventAsInProgressAsync(It.IsAny(), It.IsAny(), cancellationTokenSource.Token)).Verifiable(); integrationEventLogService.Setup(service => service.MarkEventAsPublishedAsync(It.IsAny(), cancellationTokenSource.Token)).Verifiable(); integrationEventLogService.Setup(service => service.MarkEventAsFailedAsync(It.IsAny(), cancellationTokenSource.Token)).Verifiable(); @@ -271,7 +271,7 @@ public async Task RetryByDataProcessorExecute2AndNotUseLoggerTestAsync() serviceProvider.GetService>()); await retryByDataProcessor.ExecuteAsync(cancellationTokenSource.Token); - integrationEventLogService.Verify(service => service.MarkEventAsInProgressAsync(It.IsAny(), cancellationTokenSource.Token), Times.Exactly(2)); + integrationEventLogService.Verify(service => service.MarkEventAsInProgressAsync(It.IsAny(), It.IsAny(), cancellationTokenSource.Token), Times.Exactly(2)); integrationEventLogService.Verify(service => service.MarkEventAsPublishedAsync(It.IsAny(), cancellationTokenSource.Token), Times.Never); integrationEventLogService.Verify(service => service.MarkEventAsFailedAsync(It.IsAny(), cancellationTokenSource.Token), Times.Exactly(1)); }