Skip to content

refactor: local message #374

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Dec 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,30 @@ public interface IIntegrationEventLogService
/// </summary>
/// <param name="retryBatchSize">The size of a single event to be retried</param>
/// <param name="maxRetryTimes"></param>
/// <param name="minimumRetryInterval">default: 60s</param>
/// <param name="minimumRetryInterval">Minimum retry interval (unit: s)</param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
Task<IEnumerable<IntegrationEventLog>> RetrieveEventLogsFailedToPublishAsync(
int retryBatchSize = 200,
int maxRetryTimes = 10,
int minimumRetryInterval = 60,
int retryBatchSize,
int maxRetryTimes,
int minimumRetryInterval,
CancellationToken cancellationToken = default);

/// <summary>
/// Retrieve pending messages
/// </summary>
/// <param name="batchSize">The maximum number of messages retrieved each time</param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
Task<IEnumerable<IntegrationEventLog>> RetrieveEventLogsPendingToPublishAsync(
int batchSize,
CancellationToken cancellationToken = default);

Task SaveEventAsync(IIntegrationEvent @event, DbTransaction transaction, CancellationToken cancellationToken = default);

Task MarkEventAsPublishedAsync(Guid eventId, CancellationToken cancellationToken = default);

Task MarkEventAsInProgressAsync(Guid eventId, int minimumRetryInterval,CancellationToken cancellationToken = default);
Task MarkEventAsInProgressAsync(Guid eventId, int minimumRetryInterval, CancellationToken cancellationToken = default);

Task MarkEventAsFailedAsync(Guid eventId, CancellationToken cancellationToken = default);

Expand All @@ -34,5 +44,5 @@ Task<IEnumerable<IntegrationEventLog>> RetrieveEventLogsFailedToPublishAsync(
/// <param name="batchCount"></param>
/// <param name="token"></param>
/// <returns></returns>
Task DeleteExpiresAsync(DateTime expiresAt, int batchCount = 1000, CancellationToken token = default);
Task DeleteExpiresAsync(DateTime expiresAt, int batchCount, CancellationToken token = default);
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void TestTransaction()
}

[TestMethod]
public async Task TestUseTranscationAsync()
public async Task TestUseTransactionAsync()
{
_options.Object.UseUoW<CustomDbContext>(options => options.UseTestSqlite(Connection));
var serviceProvider = _options.Object.Services.BuildServiceProvider();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,21 @@ public IntegrationEventLogService(
/// </summary>
/// <param name="retryBatchSize">maximum number of retries per retry</param>
/// <param name="maxRetryTimes"></param>
/// <param name="minimumRetryInterval">default: 60s</param>
/// <param name="minimumRetryInterval">Minimum retry interval (unit: s)</param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task<IEnumerable<IntegrationEventLog>> RetrieveEventLogsFailedToPublishAsync(
int retryBatchSize = 200,
int maxRetryTimes = 10,
int minimumRetryInterval = 60,
int retryBatchSize,
int maxRetryTimes,
int minimumRetryInterval,
CancellationToken cancellationToken = default)
{
//todo: Subsequent acquisition of the current time needs to be uniformly replaced with the unified time method provided by the framework, which is convenient for subsequent uniform replacement to UTC time or other urban time. The default setting here is Utc time.
var time = DateTime.UtcNow.AddSeconds(-minimumRetryInterval);
var result = await _eventLogContext.EventLogs
.Where(e => (e.State == IntegrationEventStates.PublishedFailed || e.State == IntegrationEventStates.InProgress) &&
e.TimesSent <= maxRetryTimes &&
e.ModificationTime < time)
.OrderBy(o => o.CreationTime)
.OrderBy(e => e.CreationTime)
.Take(retryBatchSize)
.ToListAsync(cancellationToken);

Expand All @@ -49,7 +48,34 @@ public async Task<IEnumerable<IntegrationEventLog>> RetrieveEventLogsFailedToPub
_eventTypes ??= _serviceProvider.GetRequiredService<IIntegrationEventBus>().GetAllEventTypes()
.Where(type => typeof(IIntegrationEvent).IsAssignableFrom(type));

return result.OrderBy(o => o.CreationTime)
return result.OrderBy(e => e.CreationTime)
.Select(e => e.DeserializeJsonContent(_eventTypes.First(t => t.Name == e.EventTypeShortName)));
}

return result;
}

/// <summary>
/// Retrieve pending messages
/// </summary>
/// <param name="batchSize">The maximum number of messages retrieved each time</param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task<IEnumerable<IntegrationEventLog>> RetrieveEventLogsPendingToPublishAsync(
int batchSize,
CancellationToken cancellationToken = default)
{
var result = await _eventLogContext.EventLogs
.Where(e => e.State == IntegrationEventStates.NotPublished)
.OrderBy(e => e.CreationTime)
.Take(batchSize)
.ToListAsync(cancellationToken);
if (result.Any())
{
_eventTypes ??= _serviceProvider.GetRequiredService<IIntegrationEventBus>().GetAllEventTypes()
.Where(type => typeof(IIntegrationEvent).IsAssignableFrom(type));

return result.OrderBy(e => e.CreationTime)
.Select(e => e.DeserializeJsonContent(_eventTypes.First(t => t.Name == e.EventTypeShortName)));
}

Expand Down Expand Up @@ -127,7 +153,7 @@ public Task MarkEventAsFailedAsync(Guid eventId, CancellationToken cancellationT
}, cancellationToken);
}

public async Task DeleteExpiresAsync(DateTime expiresAt, int batchCount = 1000, CancellationToken token = default)
public async Task DeleteExpiresAsync(DateTime expiresAt, int batchCount, CancellationToken token = default)
{
var eventLogs = _eventLogContext.EventLogs.Where(e => e.ModificationTime < expiresAt && e.State == IntegrationEventStates.Published)
.OrderBy(e => e.CreationTime).Take(batchCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,36 +61,8 @@ private async Task PublishIntegrationAsync<TEvent>(TEvent @event, CancellationTo
var topicName = @event.Topic;
if (@event.UnitOfWork is { UseTransaction: true } && _eventLogService != null)
{
bool isAdd = false;
try
{
_logger?.LogDebug("----- Saving changes and integrationEvent: {IntegrationEventId}", @event.GetEventId());
await _eventLogService.SaveEventAsync(@event, @event.UnitOfWork!.Transaction, cancellationToken);
isAdd = true;

_logger?.LogDebug(
"----- Publishing integration event: {IntegrationEventIdPublished} from {AppId} - ({IntegrationEvent})",
@event.GetEventId(),
_masaAppConfigureOptions?.CurrentValue.AppId ?? string.Empty, @event);

await _eventLogService.MarkEventAsInProgressAsync(@event.GetEventId(),
_dispatcherOptions.MinimumRetryInterval,
cancellationToken);

_logger?.LogDebug("Publishing event {Event} to {TopicName}", @event, topicName);
await _publisher.PublishAsync(topicName, (dynamic)@event, cancellationToken);

await _eventLogService.MarkEventAsPublishedAsync(@event.GetEventId(), cancellationToken);
}
catch (Exception ex)
{
_logger?.LogError(ex, "Error Publishing integration event: {IntegrationEventId} from {AppId} - ({IntegrationEvent})",
@event.GetEventId(), _masaAppConfigureOptions?.CurrentValue.AppId ?? string.Empty, @event);
if (!isAdd) throw;

LocalQueueProcessor.Default.AddJobs(new IntegrationEventLogItem(@event.GetEventId(), @event.Topic, @event));
await _eventLogService.MarkEventAsFailedAsync(@event.GetEventId(), cancellationToken);
}
_logger?.LogDebug("----- Saving changes and integrationEvent: {IntegrationEventId}", @event.GetEventId());
await _eventLogService.SaveEventAsync(@event, @event.UnitOfWork!.Transaction, cancellationToken);
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
</PropertyGroup>

<ItemGroup>
<FrameworkReference Include="Microsoft.AspNetCore.App"/>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="$(MicrosoftPackageVersion)"/>
<FrameworkReference Include="Microsoft.AspNetCore.App" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="$(MicrosoftPackageVersion)" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\..\..\BuildingBlocks\Configuration\Masa.BuildingBlocks.Configuration\Masa.BuildingBlocks.Configuration.csproj" />
<ProjectReference Include="..\..\..\..\BuildingBlocks\Dispatcher\Masa.BuildingBlocks.Dispatcher.IntegrationEvents\Masa.BuildingBlocks.Dispatcher.IntegrationEvents.csproj"/>
<ProjectReference Include="..\..\..\..\BuildingBlocks\Exception\Masa.BuildingBlocks.Exceptions\Masa.BuildingBlocks.Exceptions.csproj"/>
<ProjectReference Include="..\..\..\..\BuildingBlocks\Dispatcher\Masa.BuildingBlocks.Dispatcher.IntegrationEvents\Masa.BuildingBlocks.Dispatcher.IntegrationEvents.csproj" />
<ProjectReference Include="..\..\..\..\BuildingBlocks\Exception\Masa.BuildingBlocks.Exceptions\Masa.BuildingBlocks.Exceptions.csproj" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public int LocalFailedRetryInterval
private int _retryBatchSize = 100;

/// <summary>
/// maximum number of retries per retry
/// The maximum number of retrieved messages per retry
/// </summary>
public int RetryBatchSize
{
Expand All @@ -112,6 +112,22 @@ public int RetryBatchSize
}
}

private int _batchSize = 20;

/// <summary>
/// The maximum retrieval status is the number of messages waiting to be sent each time
/// </summary>
public int BatchSize
{
get => _batchSize;
set
{
MasaArgumentException.ThrowIfLessThanOrEqual(value, 0, nameof(BatchSize));

_batchSize = value;
}
}

private int _cleaningLocalQueueExpireInterval = 60;

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright (c) MASA Stack All rights reserved.
// Licensed under the MIT License. See LICENSE.txt in the project root for license information.

namespace Masa.Contrib.Dispatcher.IntegrationEvents.Processor;

public class SendByDataProcessor : ProcessorBase
{
private readonly IOptions<DispatcherOptions> _options;
private readonly IOptionsMonitor<MasaAppConfigureOptions>? _masaAppConfigureOptions;
private readonly ILogger<SendByDataProcessor>? _logger;

public override int Delay => 1;

public SendByDataProcessor(
IServiceProvider serviceProvider,
IOptions<DispatcherOptions> options,
IOptionsMonitor<MasaAppConfigureOptions>? masaAppConfigureOptions = null,
ILogger<SendByDataProcessor>? logger = null) : base(serviceProvider)
{
_masaAppConfigureOptions = masaAppConfigureOptions;
_options = options;
_logger = logger;
}

protected override async Task ExecuteAsync(IServiceProvider serviceProvider, CancellationToken stoppingToken)
{
var unitOfWork = serviceProvider.GetService<IUnitOfWork>();
if (unitOfWork != null)
unitOfWork.UseTransaction = false;

var publisher = serviceProvider.GetRequiredService<IPublisher>();
var eventLogService = serviceProvider.GetRequiredService<IIntegrationEventLogService>();

var retrieveEventLogs =
await eventLogService.RetrieveEventLogsPendingToPublishAsync(
_options.Value.BatchSize,
stoppingToken);

foreach (var eventLog in retrieveEventLogs)
{
try
{
await eventLogService.MarkEventAsInProgressAsync(eventLog.EventId, _options.Value.MinimumRetryInterval, stoppingToken);

_logger?.LogDebug("Publishing integration event {Event} to {TopicName}",
eventLog,
eventLog.Event.Topic);

await publisher.PublishAsync(eventLog.Event.Topic, eventLog.Event, stoppingToken);

await eventLogService.MarkEventAsPublishedAsync(eventLog.EventId, stoppingToken);
}
catch (UserFriendlyException)
{
//Update state due to multitasking contention, no processing required
}
catch (Exception ex)
{
_logger?.LogError(ex,
"Error Publishing integration event: {IntegrationEventId} from {AppId} - ({IntegrationEvent})",
eventLog.EventId, _masaAppConfigureOptions?.CurrentValue.AppId ?? string.Empty, eventLog);
await eventLogService.MarkEventAsFailedAsync(eventLog.EventId, stoppingToken);

LocalQueueProcessor.Default.AddJobs(new IntegrationEventLogItem(eventLog.EventId, eventLog.Event.Topic, eventLog.Event));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ internal static IServiceCollection TryAddIntegrationEventBus(
{
services.AddSingleton<IProcessor, RetryByDataProcessor>();
services.AddSingleton<IProcessor, RetryByLocalQueueProcessor>();
services.AddSingleton<IProcessor, SendByDataProcessor>();
services.AddSingleton<IProcessor, DeletePublishedExpireEventProcessor>();
services.AddSingleton<IProcessor, DeleteLocalQueueExpiresProcessor>();
}
Expand Down
Loading