Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(IntegrationEventBus): When repairing multiple copies, reduce the duplication of event publishing due to concurrency #344

Merged
merged 5 commits into from
Nov 23, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -23,7 +23,7 @@ Task<IEnumerable<IntegrationEventLog>> RetrieveEventLogsFailedToPublishAsync(

Task MarkEventAsPublishedAsync(Guid eventId, CancellationToken cancellationToken = default);

Task MarkEventAsInProgressAsync(Guid eventId, CancellationToken cancellationToken = default);
Task MarkEventAsInProgressAsync(Guid eventId, int minimumRetryInterval,CancellationToken cancellationToken = default);

Task MarkEventAsFailedAsync(Guid eventId, CancellationToken cancellationToken = default);

Original file line number Diff line number Diff line change
@@ -83,7 +83,7 @@ public async Task TestCommitAsync()
{
Name = "Tom"
};
var transcation = uoW.Transaction;
var transaction = uoW.Transaction;
dbContext.User.Add(user);
uoW.EntityState = EntityState.Changed;
await uoW.SaveChangesAsync();
@@ -102,7 +102,7 @@ public async Task TestOpenRollbackAsync()
await dbContext.Database.EnsureCreatedAsync();
var uoW = serviceProvider.GetRequiredService<IUnitOfWork>();
var user = new Users();
var transcation = uoW.Transaction;
var transaction = uoW.Transaction;
dbContext.User.Add(user);
await uoW.CommitAsync();

@@ -119,7 +119,7 @@ public async Task TestAddLoggerAndOpenRollbackAsync()
await dbContext.Database.EnsureCreatedAsync();
var uoW = serviceProvider.GetRequiredService<IUnitOfWork>();
var user = new Users();
var transcation = uoW.Transaction;
var transaction = uoW.Transaction;
dbContext.User.Add(user);
await uoW.CommitAsync();

Original file line number Diff line number Diff line change
@@ -86,11 +86,22 @@ public Task MarkEventAsPublishedAsync(Guid eventId, CancellationToken cancellati
}, cancellationToken);
}

public Task MarkEventAsInProgressAsync(Guid eventId, CancellationToken cancellationToken = default)
public Task MarkEventAsInProgressAsync(Guid eventId, int minimumRetryInterval, CancellationToken cancellationToken = default)
{
return UpdateEventStatus(eventId, IntegrationEventStates.InProgress, eventLog =>
{
if (eventLog.State != IntegrationEventStates.NotPublished && eventLog.State != IntegrationEventStates.PublishedFailed)
if (eventLog.State is IntegrationEventStates.InProgress or IntegrationEventStates.PublishedFailed &&
(eventLog.GetCurrentTime() - eventLog.ModificationTime).TotalSeconds < minimumRetryInterval)
{
_logger?.LogInformation(
"Failed to modify the state of the local message table to {OptState}, the current State is {State}, Id: {Id}, Multitasking execution error, waiting for the next retry",
IntegrationEventStates.InProgress, eventLog.State, eventLog.Id);
throw new UserFriendlyException(
$"Failed to modify the state of the local message table to {IntegrationEventStates.InProgress}, the current State is {eventLog.State}, Id: {eventLog.Id}, Multitasking execution error, waiting for the next retry");
}
if (eventLog.State != IntegrationEventStates.NotPublished &&
eventLog.State != IntegrationEventStates.InProgress &&
eventLog.State != IntegrationEventStates.PublishedFailed)
{
_logger?.LogWarning(
"Failed to modify the state of the local message table to {OptState}, the current State is {State}, Id: {Id}",
@@ -135,7 +146,8 @@ private async Task UpdateEventStatus(Guid eventId,
Action<IntegrationEventLog>? action = null,
CancellationToken cancellationToken = default)
{
var eventLogEntry = await _eventLogContext.EventLogs.FirstOrDefaultAsync(e => e.EventId == eventId, cancellationToken: cancellationToken);
var eventLogEntry =
await _eventLogContext.EventLogs.FirstOrDefaultAsync(e => e.EventId == eventId, cancellationToken: cancellationToken);
if (eventLogEntry == null)
throw new ArgumentException(
$"The local message record does not exist, please confirm whether the local message record has been deleted or other reasons cause the local message record to not be inserted successfully In EventId: {eventId}",
Original file line number Diff line number Diff line change
@@ -73,7 +73,9 @@ private async Task PublishIntegrationAsync<TEvent>(TEvent @event, CancellationTo
@event.GetEventId(),
_masaAppConfigureOptions?.CurrentValue.AppId ?? string.Empty, @event);

await _eventLogService.MarkEventAsInProgressAsync(@event.GetEventId(),cancellationToken);
await _eventLogService.MarkEventAsInProgressAsync(@event.GetEventId(),
_dispatcherOptions.MinimumRetryInterval,
cancellationToken);

_logger?.LogDebug("Publishing event {Event} to {TopicName}", @event, topicName);
await _publisher.PublishAsync(topicName, (dynamic)@event, cancellationToken);
Original file line number Diff line number Diff line change
@@ -25,46 +25,49 @@ public RetryByDataProcessor(
protected override async Task ExecuteAsync(IServiceProvider serviceProvider, CancellationToken stoppingToken)
{
var unitOfWork = serviceProvider.GetService<IUnitOfWork>();
if (unitOfWork != null)
unitOfWork.UseTransaction = false;
if (unitOfWork != null)
unitOfWork.UseTransaction = false;

var publisher = serviceProvider.GetRequiredService<IPublisher>();
var eventLogService = serviceProvider.GetRequiredService<IIntegrationEventLogService>();
var publisher = serviceProvider.GetRequiredService<IPublisher>();
var eventLogService = serviceProvider.GetRequiredService<IIntegrationEventLogService>();

var retrieveEventLogs =
await eventLogService.RetrieveEventLogsFailedToPublishAsync(_options.Value.RetryBatchSize, _options.Value.MaxRetryTimes,
_options.Value.MinimumRetryInterval, stoppingToken);
var retrieveEventLogs =
await eventLogService.RetrieveEventLogsFailedToPublishAsync(
_options.Value.RetryBatchSize,
_options.Value.MaxRetryTimes,
_options.Value.MinimumRetryInterval,
stoppingToken);

foreach (var eventLog in retrieveEventLogs)
foreach (var eventLog in retrieveEventLogs)
{
try
{
try
{
if (LocalQueueProcessor.Default.IsExist(eventLog.EventId))
continue; // The local queue is retrying, no need to retry
if (LocalQueueProcessor.Default.IsExist(eventLog.EventId))
continue; // The local queue is retrying, no need to retry

await eventLogService.MarkEventAsInProgressAsync(eventLog.EventId, stoppingToken);
await eventLogService.MarkEventAsInProgressAsync(eventLog.EventId, _options.Value.MinimumRetryInterval, stoppingToken);

_logger?.LogDebug("Publishing integration event {Event} to {TopicName}",
eventLog,
eventLog.Event.Topic);
_logger?.LogDebug("Publishing integration event {Event} to {TopicName}",
eventLog,
eventLog.Event.Topic);

await publisher.PublishAsync(eventLog.Event.Topic, eventLog.Event, stoppingToken);
await publisher.PublishAsync(eventLog.Event.Topic, eventLog.Event, stoppingToken);

LocalQueueProcessor.Default.RemoveJobs(eventLog.EventId);
LocalQueueProcessor.Default.RemoveJobs(eventLog.EventId);

await eventLogService.MarkEventAsPublishedAsync(eventLog.EventId, stoppingToken);
}
catch (UserFriendlyException)
{
//Update state due to multitasking contention, no processing required
}
catch (Exception ex)
{
_logger?.LogError(ex,
"Error Publishing integration event: {IntegrationEventId} from {AppId} - ({IntegrationEvent})",
eventLog.EventId, _masaAppConfigureOptions?.CurrentValue.AppId ?? string.Empty, eventLog);
await eventLogService.MarkEventAsFailedAsync(eventLog.EventId, stoppingToken);
}
await eventLogService.MarkEventAsPublishedAsync(eventLog.EventId, stoppingToken);
}
catch (UserFriendlyException)
{
//Update state due to multitasking contention, no processing required
}
catch (Exception ex)
{
_logger?.LogError(ex,
"Error Publishing integration event: {IntegrationEventId} from {AppId} - ({IntegrationEvent})",
eventLog.EventId, _masaAppConfigureOptions?.CurrentValue.AppId ?? string.Empty, eventLog);
await eventLogService.MarkEventAsFailedAsync(eventLog.EventId, stoppingToken);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -25,47 +25,47 @@ public RetryByLocalQueueProcessor(
protected override async Task ExecuteAsync(IServiceProvider serviceProvider, CancellationToken stoppingToken)
{
var unitOfWork = serviceProvider.GetService<IUnitOfWork>();
if (unitOfWork != null)
unitOfWork.UseTransaction = false;
if (unitOfWork != null)
unitOfWork.UseTransaction = false;

var publisher = serviceProvider.GetRequiredService<IPublisher>();
var eventLogService = serviceProvider.GetRequiredService<IIntegrationEventLogService>();
var publisher = serviceProvider.GetRequiredService<IPublisher>();
var eventLogService = serviceProvider.GetRequiredService<IIntegrationEventLogService>();

var retrieveEventLogs =
LocalQueueProcessor.Default.RetrieveEventLogsFailedToPublishAsync(_options.Value.LocalRetryTimes,
_options.Value.RetryBatchSize);
var retrieveEventLogs =
LocalQueueProcessor.Default.RetrieveEventLogsFailedToPublishAsync(_options.Value.LocalRetryTimes,
_options.Value.RetryBatchSize);

foreach (var eventLog in retrieveEventLogs)
foreach (var eventLog in retrieveEventLogs)
{
try
{
try
{
LocalQueueProcessor.Default.RetryJobs(eventLog.EventId);
LocalQueueProcessor.Default.RetryJobs(eventLog.EventId);

await eventLogService.MarkEventAsInProgressAsync(eventLog.EventId);
await eventLogService.MarkEventAsInProgressAsync(eventLog.EventId, _options.Value.MinimumRetryInterval, stoppingToken);

_logger?.LogDebug(
"Publishing integration event {Event} to {TopicName}",
eventLog,
eventLog.Topic);
_logger?.LogDebug(
"Publishing integration event {Event} to {TopicName}",
eventLog,
eventLog.Topic);

await publisher.PublishAsync(eventLog.Topic, eventLog.Event, stoppingToken);
await publisher.PublishAsync(eventLog.Topic, eventLog.Event, stoppingToken);

await eventLogService.MarkEventAsPublishedAsync(eventLog.EventId);
await eventLogService.MarkEventAsPublishedAsync(eventLog.EventId, stoppingToken);

LocalQueueProcessor.Default.RemoveJobs(eventLog.EventId);
}
catch (UserFriendlyException)
{
//Update state due to multitasking contention
LocalQueueProcessor.Default.RemoveJobs(eventLog.EventId);
}
catch (Exception ex)
{
_logger?.LogError(ex,
"Error Publishing integration event: {IntegrationEventId} from {AppId} - ({IntegrationEvent})",
eventLog.EventId, _masaAppConfigureOptions?.CurrentValue.AppId ?? string.Empty, eventLog);
await eventLogService.MarkEventAsFailedAsync(eventLog.EventId);
}
LocalQueueProcessor.Default.RemoveJobs(eventLog.EventId);
}
catch (UserFriendlyException)
{
//Update state due to multitasking contention
LocalQueueProcessor.Default.RemoveJobs(eventLog.EventId);
}
catch (Exception ex)
{
_logger?.LogError(ex,
"Error Publishing integration event: {IntegrationEventId} from {AppId} - ({IntegrationEvent})",
eventLog.EventId, _masaAppConfigureOptions?.CurrentValue.AppId ?? string.Empty, eventLog);
await eventLogService.MarkEventAsFailedAsync(eventLog.EventId, stoppingToken);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -24,7 +24,7 @@ public void Initialize()
.Returns(() => new DispatcherOptions(_options.Object.Services, AppDomain.CurrentDomain.GetAssemblies()));
_eventLog = new();
_eventLog.Setup(eventLog => eventLog.SaveEventAsync(It.IsAny<IIntegrationEvent>(), null!, default)).Verifiable();
_eventLog.Setup(eventLog => eventLog.MarkEventAsInProgressAsync(It.IsAny<Guid>(), default)).Verifiable();
_eventLog.Setup(eventLog => eventLog.MarkEventAsInProgressAsync(It.IsAny<Guid>(), It.IsAny<int>(), default)).Verifiable();
_eventLog.Setup(eventLog => eventLog.MarkEventAsPublishedAsync(It.IsAny<Guid>(), default)).Verifiable();
_eventLog.Setup(eventLog => eventLog.MarkEventAsFailedAsync(It.IsAny<Guid>(), default)).Verifiable();
_masaAppConfigureOptions = new();
Original file line number Diff line number Diff line change
@@ -99,18 +99,18 @@ public async Task TestSaveEventAsync()
{
var response = await InitializeAsync();

Assert.IsTrue(await response.CustomDbContext.Set<IntegrationEventLog>().CountAsync() == 0);
Assert.IsFalse(await response.CustomDbContext.Set<IntegrationEventLog>().AnyAsync());

await using (var transcation = await response.CustomDbContext.Database.BeginTransactionAsync())
await using (var transaction = await response.CustomDbContext.Database.BeginTransactionAsync())
{
var logService = response.ServiceProvider.GetRequiredService<IIntegrationEventLogService>();
var @event = new OrderPaymentSucceededIntegrationEvent
{
OrderId = "1234567890123",
PaymentTime = (long)(DateTime.UtcNow - new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc)).TotalSeconds
};
await logService.SaveEventAsync(@event, transcation.GetDbTransaction());
await transcation.CommitAsync();
await logService.SaveEventAsync(@event, transaction.GetDbTransaction());
await transaction.CommitAsync();
}

Assert.IsTrue(await response.CustomDbContext.Set<IntegrationEventLog>().CountAsync() == 1);
@@ -124,56 +124,81 @@ public async Task TestSaveEventByExceptionAsync()
{
var response = await InitializeAsync();

Assert.IsTrue(await response.CustomDbContext.Set<IntegrationEventLog>().CountAsync() == 0);
Assert.IsFalse(await response.CustomDbContext.Set<IntegrationEventLog>().AnyAsync());

await Assert.ThrowsExceptionAsync<Exception>(async () =>
{
await using var transcation = await response.CustomDbContext.Database.BeginTransactionAsync();
await using var transaction = await response.CustomDbContext.Database.BeginTransactionAsync();
var logService = response.ServiceProvider.GetRequiredService<IIntegrationEventLogService>();
var @event = new OrderPaymentSucceededIntegrationEvent
{
OrderId = "1234567890123",
PaymentTime = (long)(DateTime.UtcNow - new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc)).TotalSeconds
};
await logService.SaveEventAsync(@event, transcation.GetDbTransaction());
await logService.SaveEventAsync(@event, transaction.GetDbTransaction());
throw new Exception("custom exception");
}, "custom exception");

Assert.IsTrue(await response.CustomDbContext.Set<IntegrationEventLog>().CountAsync() == 0);
Assert.IsFalse(await response.CustomDbContext.Set<IntegrationEventLog>().AnyAsync());
}

[TestMethod]
public async Task TestMarkEventAsInProgressAsync()
{
var response = await InitializeAsync();
Assert.IsTrue(await response.CustomDbContext.Set<IntegrationEventLog>().CountAsync() == 0);
Assert.IsFalse(await response.CustomDbContext.Set<IntegrationEventLog>().AnyAsync());

await using var transcation = await response.CustomDbContext.Database.BeginTransactionAsync();
await using var transaction = await response.CustomDbContext.Database.BeginTransactionAsync();
var logService = response.ServiceProvider.GetRequiredService<IIntegrationEventLogService>();
var @event = new OrderPaymentSucceededIntegrationEvent
{
OrderId = "1234567890123",
PaymentTime = (long)(DateTime.UtcNow - new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc)).TotalSeconds
};
await logService.SaveEventAsync(@event, transcation.GetDbTransaction());
await logService.SaveEventAsync(@event, transaction.GetDbTransaction());

await logService.MarkEventAsInProgressAsync(@event.Id);
await logService.MarkEventAsInProgressAsync(@event.Id, 10);
Assert.IsTrue(await response.CustomDbContext.Set<IntegrationEventLog>()
.CountAsync(log => log.State == IntegrationEventStates.InProgress) == 1);
await transcation.CommitAsync();
await transaction.CommitAsync();

Assert.IsTrue(await response.CustomDbContext.Set<IntegrationEventLog>()
.CountAsync(log => log.State == IntegrationEventStates.InProgress) == 1);
Assert.IsTrue(await response.CustomDbContext.Set<IntegrationEventLog>().CountAsync() == 1);
}


[TestMethod]
public async Task TestMultiJobMarkEventAsInProgressAsync()
{
var response = await InitializeAsync();
Assert.IsFalse(await response.CustomDbContext.Set<IntegrationEventLog>().AnyAsync());

await using var transaction = await response.CustomDbContext.Database.BeginTransactionAsync();
var logService = response.ServiceProvider.GetRequiredService<IIntegrationEventLogService>();
var @event = new OrderPaymentSucceededIntegrationEvent
{
OrderId = "1234567890123",
PaymentTime = (long)(DateTime.UtcNow - new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc)).TotalSeconds
};
await logService.SaveEventAsync(@event, transaction.GetDbTransaction());

await logService.MarkEventAsInProgressAsync(@event.Id, 10);
Assert.IsTrue(await response.CustomDbContext.Set<IntegrationEventLog>()
.CountAsync(log => log.State == IntegrationEventStates.InProgress) == 1);
await transaction.CommitAsync();

await Assert.ThrowsExceptionAsync<UserFriendlyException>(async ()
=> await logService.MarkEventAsInProgressAsync(@event.Id, 60, default));
}

[TestMethod]
public async Task TestMarkEventAsInProgress2Async()
{
var response = await InitializeAsync();
Assert.IsTrue(await response.CustomDbContext.Set<IntegrationEventLog>().CountAsync() == 0);
Assert.IsFalse(await response.CustomDbContext.Set<IntegrationEventLog>().AnyAsync());

await using var transcation = await response.CustomDbContext.Database.BeginTransactionAsync();
await using var transaction = await response.CustomDbContext.Database.BeginTransactionAsync();
var logService = response.ServiceProvider.GetRequiredService<IIntegrationEventLogService>();


@@ -189,29 +214,29 @@ await response.CustomDbContext.Set<IntegrationEventLog>().AddAsync(new Integrati

await response.CustomDbContext.SaveChangesAsync();

await Assert.ThrowsExceptionAsync<UserFriendlyException>(async () => await logService.MarkEventAsInProgressAsync(@event.Id));
await Assert.ThrowsExceptionAsync<UserFriendlyException>(async () => await logService.MarkEventAsInProgressAsync(@event.Id, 10));
}

[TestMethod]
public async Task TestMarkEventAsPublishedAsync()
{
var response = await InitializeAsync();
Assert.IsTrue(await response.CustomDbContext.Set<IntegrationEventLog>().CountAsync() == 0);
Assert.IsFalse(await response.CustomDbContext.Set<IntegrationEventLog>().AnyAsync());

await using var transcation = await response.CustomDbContext.Database.BeginTransactionAsync();
await using var transaction = await response.CustomDbContext.Database.BeginTransactionAsync();
var logService = response.ServiceProvider.GetRequiredService<IIntegrationEventLogService>();
var @event = new OrderPaymentSucceededIntegrationEvent
{
OrderId = "1234567890123",
PaymentTime = (long)(DateTime.UtcNow - new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc)).TotalSeconds
};
await logService.SaveEventAsync(@event, transcation.GetDbTransaction());
await logService.SaveEventAsync(@event, transaction.GetDbTransaction());

await logService.MarkEventAsInProgressAsync(@event.Id);
await logService.MarkEventAsInProgressAsync(@event.Id, 10);

await logService.MarkEventAsPublishedAsync(@event.Id);

await transcation.CommitAsync();
await transaction.CommitAsync();

Assert.IsTrue(await response.CustomDbContext.Set<IntegrationEventLog>()
.CountAsync(log => log.State == IntegrationEventStates.Published) == 1);
@@ -222,16 +247,16 @@ public async Task TestMarkEventAsPublishedAsync()
public async Task TestMarkEventAsPublished2Async()
{
var response = await InitializeAsync();
Assert.IsTrue(await response.CustomDbContext.Set<IntegrationEventLog>().CountAsync() == 0);
Assert.IsFalse(await response.CustomDbContext.Set<IntegrationEventLog>().AnyAsync());

await using var transcation = await response.CustomDbContext.Database.BeginTransactionAsync();
await using var transaction = await response.CustomDbContext.Database.BeginTransactionAsync();
var logService = response.ServiceProvider.GetRequiredService<IIntegrationEventLogService>();
var @event = new OrderPaymentSucceededIntegrationEvent
{
OrderId = "1234567890123",
PaymentTime = (long)(DateTime.UtcNow - new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc)).TotalSeconds
};
await logService.SaveEventAsync(@event, transcation.GetDbTransaction());
await logService.SaveEventAsync(@event, transaction.GetDbTransaction());

await Assert.ThrowsExceptionAsync<UserFriendlyException>(async () => await logService.MarkEventAsPublishedAsync(@event.Id));
}
@@ -240,37 +265,38 @@ public async Task TestMarkEventAsPublished2Async()
public async Task TestMarkEventAsFailedAsync()
{
var response = await InitializeAsync();
Assert.IsTrue(await response.CustomDbContext.Set<IntegrationEventLog>().CountAsync() == 0);
Assert.IsFalse(await response.CustomDbContext.Set<IntegrationEventLog>().AnyAsync());

await using var transcation = await response.CustomDbContext.Database.BeginTransactionAsync();
await using var transaction = await response.CustomDbContext.Database.BeginTransactionAsync();
var logService = response.ServiceProvider.GetRequiredService<IIntegrationEventLogService>();
var @event = new OrderPaymentSucceededIntegrationEvent
{
OrderId = "1234567890123",
PaymentTime = (long)(DateTime.UtcNow - new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc)).TotalSeconds
};
await logService.SaveEventAsync(@event, transcation.GetDbTransaction());
await logService.MarkEventAsInProgressAsync(@event.Id);
await logService.SaveEventAsync(@event, transaction.GetDbTransaction());
await logService.MarkEventAsInProgressAsync(@event.Id, 10);
await logService.MarkEventAsFailedAsync(@event.Id);
await transcation.CommitAsync();
await transaction.CommitAsync();

Assert.IsTrue(await response.CustomDbContext.Set<IntegrationEventLog>().CountAsync(log => log.State == IntegrationEventStates.PublishedFailed) == 1);
Assert.IsTrue(await response.CustomDbContext.Set<IntegrationEventLog>()
.CountAsync(log => log.State == IntegrationEventStates.PublishedFailed) == 1);
}

[TestMethod]
public async Task TestMarkEventAsFailed2Async()
{
var response = await InitializeAsync();
Assert.IsTrue(await response.CustomDbContext.Set<IntegrationEventLog>().CountAsync() == 0);
Assert.IsFalse(await response.CustomDbContext.Set<IntegrationEventLog>().AnyAsync());

await using var transcation = await response.CustomDbContext.Database.BeginTransactionAsync();
await using var transaction = await response.CustomDbContext.Database.BeginTransactionAsync();
var logService = response.ServiceProvider.GetRequiredService<IIntegrationEventLogService>();
var @event = new OrderPaymentSucceededIntegrationEvent
{
OrderId = "1234567890123",
PaymentTime = (long)(DateTime.UtcNow - new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc)).TotalSeconds
};
await logService.SaveEventAsync(@event, transcation.GetDbTransaction());
await logService.SaveEventAsync(@event, transaction.GetDbTransaction());
await Assert.ThrowsExceptionAsync<UserFriendlyException>(async () => await logService.MarkEventAsFailedAsync(@event.Id));
}

Original file line number Diff line number Diff line change
@@ -15,7 +15,7 @@ public Task DeleteExpiresAsync(DateTime expiresAt, int batchCount = 1000, Cancel
throw new NotImplementedException();
}

public Task MarkEventAsInProgressAsync(Guid eventId, CancellationToken cancellationToken = default)
public Task MarkEventAsInProgressAsync(Guid eventId, int minimumRetryInterval, CancellationToken cancellationToken = default)
{
return Task.CompletedTask;
}
Original file line number Diff line number Diff line change
@@ -28,7 +28,7 @@ public void Initialize()
_logger = new();
_eventLog = new();
_eventLog.Setup(eventLog => eventLog.SaveEventAsync(It.IsAny<IIntegrationEvent>(), null!, default)).Verifiable();
_eventLog.Setup(eventLog => eventLog.MarkEventAsInProgressAsync(It.IsAny<Guid>(), default)).Verifiable();
_eventLog.Setup(eventLog => eventLog.MarkEventAsInProgressAsync(It.IsAny<Guid>(), It.IsAny<int>(), default)).Verifiable();
_eventLog.Setup(eventLog => eventLog.MarkEventAsPublishedAsync(It.IsAny<Guid>(), default)).Verifiable();
_eventLog.Setup(eventLog => eventLog.MarkEventAsFailedAsync(It.IsAny<Guid>(), default)).Verifiable();
_masaAppConfigureOptions = new();
@@ -108,7 +108,7 @@ public async Task TestNotUseUoWAndLoggerAsync()
.Verifiable();
await integrationEventBus.PublishAsync(@event);

_eventLog.Verify(eventLog => eventLog.MarkEventAsInProgressAsync(@event.GetEventId(), default), Times.Never);
_eventLog.Verify(eventLog => eventLog.MarkEventAsInProgressAsync(@event.GetEventId(), It.IsAny<int>(), default), Times.Never);
_publisher.Verify(client => client.PublishAsync(@event.Topic, @event, default),
Times.Once);
_eventLog.Verify(eventLog => eventLog.MarkEventAsPublishedAsync(@event.GetEventId(), default), Times.Never);
@@ -136,7 +136,7 @@ public async Task TestNotUseTransactionAsync()
.Verifiable();
await integrationEventBus.PublishAsync(@event);

_eventLog.Verify(eventLog => eventLog.MarkEventAsInProgressAsync(@event.GetEventId(), default), Times.Never);
_eventLog.Verify(eventLog => eventLog.MarkEventAsInProgressAsync(@event.GetEventId(), It.IsAny<int>(), default), Times.Never);
_publisher.Verify(client => client.PublishAsync(@event.Topic, @event, default),
Times.Once);
_eventLog.Verify(eventLog => eventLog.MarkEventAsPublishedAsync(@event.GetEventId(), default), Times.Never);
@@ -163,7 +163,7 @@ public async Task TestUseTranscationAndNotUseLoggerAsync()
.Verifiable();
await integrationEventBus.PublishAsync(@event);

_eventLog.Verify(eventLog => eventLog.MarkEventAsInProgressAsync(@event.GetEventId(), default), Times.Once);
_eventLog.Verify(eventLog => eventLog.MarkEventAsInProgressAsync(@event.GetEventId(), It.IsAny<int>(), default), Times.Once);
_publisher.Verify(client => client.PublishAsync(@event.Topic, @event, default),
Times.Once);
_eventLog.Verify(eventLog => eventLog.MarkEventAsPublishedAsync(@event.GetEventId(), default), Times.Once);
@@ -214,7 +214,7 @@ public async Task TestPublishIntegrationEventAndFailedAsync()
.Verifiable();
await integrationEventBus.PublishAsync(@event);

_eventLog.Verify(eventLog => eventLog.MarkEventAsInProgressAsync(@event.GetEventId(), default), Times.Once);
_eventLog.Verify(eventLog => eventLog.MarkEventAsInProgressAsync(@event.GetEventId(), It.IsAny<int>(), default), Times.Once);
_publisher.Verify(client => client.PublishAsync(@event.Topic, @event, default),
Times.Once);
_eventLog.Verify(eventLog => eventLog.MarkEventAsPublishedAsync(@event.GetEventId(), default), Times.Once);
Original file line number Diff line number Diff line change
@@ -57,7 +57,7 @@ public async Task RetryByDataProcessorExecuteTestAsync()
Mock<IIntegrationEventLogService> integrationEventLogService = new();
RegisterUserIntegrationEvent @event = new RegisterUserIntegrationEvent();

integrationEventLogService.Setup(service => service.MarkEventAsInProgressAsync(It.IsAny<Guid>(), cancellationTokenSource.Token)).Verifiable();
integrationEventLogService.Setup(service => service.MarkEventAsInProgressAsync(It.IsAny<Guid>(), It.IsAny<int>(), cancellationTokenSource.Token)).Verifiable();
integrationEventLogService.Setup(service => service.MarkEventAsPublishedAsync(It.IsAny<Guid>(), cancellationTokenSource.Token)).Verifiable();

List<IntegrationEventLog> list = new List<IntegrationEventLog>()
@@ -107,7 +107,7 @@ public async Task RetryByDataProcessorExecuteTestAsync()
serviceProvider.GetService<ILogger<RetryByDataProcessor>>());
await retryByDataProcessor.ExecuteAsync(cancellationTokenSource.Token);

integrationEventLogService.Verify(service => service.MarkEventAsInProgressAsync(It.IsAny<Guid>(), cancellationTokenSource.Token), Times.Exactly(2));
integrationEventLogService.Verify(service => service.MarkEventAsInProgressAsync(It.IsAny<Guid>(), It.IsAny<int>(), cancellationTokenSource.Token), Times.Exactly(2));
integrationEventLogService.Verify(service => service.MarkEventAsPublishedAsync(It.IsAny<Guid>(), cancellationTokenSource.Token), Times.Exactly(2));
}

@@ -121,7 +121,7 @@ public async Task RetryByDataProcessorExecute2TestAsync()
cancellationTokenSource.CancelAfter(1000);

Mock<IIntegrationEventLogService> integrationEventLogService = new();
integrationEventLogService.Setup(service => service.MarkEventAsInProgressAsync(It.IsAny<Guid>(), cancellationTokenSource.Token))
integrationEventLogService.Setup(service => service.MarkEventAsInProgressAsync(It.IsAny<Guid>(), It.IsAny<int>(), cancellationTokenSource.Token))
.Verifiable();
integrationEventLogService.Setup(service => service.MarkEventAsPublishedAsync(It.IsAny<Guid>(), cancellationTokenSource.Token))
.Verifiable();
@@ -191,7 +191,7 @@ public async Task RetryByDataProcessorExecute2TestAsync()
serviceProvider.GetService<ILogger<RetryByDataProcessor>>());
await retryByDataProcessor.ExecuteAsync(cancellationTokenSource.Token);

integrationEventLogService.Verify(service => service.MarkEventAsInProgressAsync(It.IsAny<Guid>(), cancellationTokenSource.Token), Times.Exactly(2));
integrationEventLogService.Verify(service => service.MarkEventAsInProgressAsync(It.IsAny<Guid>(), It.IsAny<int>(), cancellationTokenSource.Token), Times.Exactly(2));
integrationEventLogService.Verify(service => service.MarkEventAsPublishedAsync(It.IsAny<Guid>(), cancellationTokenSource.Token), Times.Never);
integrationEventLogService.Verify(service => service.MarkEventAsFailedAsync(It.IsAny<Guid>(), cancellationTokenSource.Token), Times.Exactly(1));
}
@@ -205,7 +205,7 @@ public async Task RetryByDataProcessorExecute2AndNotUseLoggerTestAsync()
cancellationTokenSource.CancelAfter(1000);

Mock<IIntegrationEventLogService> integrationEventLogService = new();
integrationEventLogService.Setup(service => service.MarkEventAsInProgressAsync(It.IsAny<Guid>(), cancellationTokenSource.Token)).Verifiable();
integrationEventLogService.Setup(service => service.MarkEventAsInProgressAsync(It.IsAny<Guid>(), It.IsAny<int>(), cancellationTokenSource.Token)).Verifiable();
integrationEventLogService.Setup(service => service.MarkEventAsPublishedAsync(It.IsAny<Guid>(), cancellationTokenSource.Token)).Verifiable();
integrationEventLogService.Setup(service => service.MarkEventAsFailedAsync(It.IsAny<Guid>(), cancellationTokenSource.Token)).Verifiable();

@@ -271,7 +271,7 @@ public async Task RetryByDataProcessorExecute2AndNotUseLoggerTestAsync()
serviceProvider.GetService<ILogger<RetryByDataProcessor>>());
await retryByDataProcessor.ExecuteAsync(cancellationTokenSource.Token);

integrationEventLogService.Verify(service => service.MarkEventAsInProgressAsync(It.IsAny<Guid>(), cancellationTokenSource.Token), Times.Exactly(2));
integrationEventLogService.Verify(service => service.MarkEventAsInProgressAsync(It.IsAny<Guid>(), It.IsAny<int>(), cancellationTokenSource.Token), Times.Exactly(2));
integrationEventLogService.Verify(service => service.MarkEventAsPublishedAsync(It.IsAny<Guid>(), cancellationTokenSource.Token), Times.Never);
integrationEventLogService.Verify(service => service.MarkEventAsFailedAsync(It.IsAny<Guid>(), cancellationTokenSource.Token), Times.Exactly(1));
}